Browse Source

Switch ServiceInstanceListSupplier to use Flux. Add implementation with

delayed list. Add subscriber-based resetting instances.
wip-poc-gh-595-power-of-two-loadbalancer
Olga Maciaszek-Sharma 5 years ago
parent
commit
52c5b9e264
  1. 14
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java
  2. 16
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/PowerOfTwoChoicesPOCLoadBalancer.java
  3. 1
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/RoundRobinListLoadBalancer.java
  4. 4
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplier.java

14
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java

@ -1,9 +1,10 @@ @@ -1,9 +1,10 @@
package org.springframework.cloud.loadbalancer.core;
import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.core.env.Environment;
@ -26,14 +27,19 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan @@ -26,14 +27,19 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan
}
@Override
public Mono<List<ConnectionTrackingServiceInstance>> get() {
List<ConnectionTrackingServiceInstance> instances = this.delegate
public Flux<List<ConnectionTrackingServiceInstance>> get() {
//FIXME: sensible defaults + config
return Flux.just(getInstances())
.delayElements(Duration.ofMinutes(5));
}
private List<ConnectionTrackingServiceInstance> getInstances() {
return this.delegate
.getInstances(this.serviceId)
.stream()
// switch to a more sensible conversion
.map(serviceInstance -> (ConnectionTrackingServiceInstance) serviceInstance)
.collect(Collectors.toList());
return Mono.just(instances);
}
public String getServiceId() {

16
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/PowerOfTwoChoicesPOCLoadBalancer.java

@ -37,12 +37,18 @@ public class PowerOfTwoChoicesPOCLoadBalancer implements ReactorServiceInstanceL @@ -37,12 +37,18 @@ public class PowerOfTwoChoicesPOCLoadBalancer implements ReactorServiceInstanceL
resetInstances();
}
// private void resetInstances() {
// Schedulers.fromExecutorService(Executors.newSingleThreadScheduledExecutor())
// .schedulePeriodically(() -> instances = serviceInstanceListSupplier
// // maybe we don't have to block at all?
// // TODO: sensible interval defaults + config
// .getIfAvailable().get().next().block(), 0, 10, TimeUnit.MINUTES);
// }
private void resetInstances() {
Schedulers.fromExecutorService(Executors.newSingleThreadScheduledExecutor())
.schedulePeriodically(() -> instances = serviceInstanceListSupplier
// maybe we don't have to block at all?
// TODO: sensible interval defaults + config
.getIfAvailable().get().block(), 0, 10, TimeUnit.MINUTES);
serviceInstanceListSupplier.getIfAvailable()
.get()
.subscribe(connectionTrackingServiceInstances -> instances = connectionTrackingServiceInstances);
}
// TODO: optimise

1
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/RoundRobinListLoadBalancer.java

@ -48,6 +48,7 @@ public class RoundRobinListLoadBalancer implements ReactorServiceInstanceLoadBal @@ -48,6 +48,7 @@ public class RoundRobinListLoadBalancer implements ReactorServiceInstanceLoadBal
// TODO: move supplier to Request?
ServiceInstanceListSupplier<ServiceInstance> supplier = this.serviceInstanceListSupplier.getIfAvailable();
return supplier.get()
.next()
.map(instances -> {
if (instances.isEmpty()) {
log.warn("No servers available for service: " + this.serviceId);

4
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/ServiceInstanceListSupplier.java

@ -3,14 +3,14 @@ package org.springframework.cloud.loadbalancer.core; @@ -3,14 +3,14 @@ package org.springframework.cloud.loadbalancer.core;
import java.util.List;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Flux;
import org.springframework.cloud.client.ServiceInstance;
/**
* @author Olga Maciaszek-Sharma
*/
public interface ServiceInstanceListSupplier<T extends ServiceInstance> extends Supplier<Mono<List<T>>> {
public interface ServiceInstanceListSupplier<T extends ServiceInstance> extends Supplier<Flux<List<T>>> {
String getServiceId();
}

Loading…
Cancel
Save