Compare commits

...

1 Commits

Author SHA1 Message Date
Olga MaciaszekSharma f8ca457b15 Move discovery client flux creation code to a method. 4 years ago
  1. 59
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java

59
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java

@ -50,36 +50,24 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan @@ -50,36 +50,24 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan
private static final Log LOG = LogFactory.getLog(DiscoveryClientServiceInstanceListSupplier.class);
private Duration timeout = Duration.ofSeconds(30);
private final String serviceId;
private final Flux<List<ServiceInstance>> serviceInstances;
private final Object delegate;
private Duration timeout = Duration.ofSeconds(30);
private Flux<List<ServiceInstance>> serviceInstances = null;
public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) {
this.serviceId = environment.getProperty(PROPERTY_NAME);
resolveTimeout(environment);
this.serviceInstances = Flux.defer(() -> Flux.just(delegate.getInstances(serviceId)))
.subscribeOn(Schedulers.boundedElastic()).timeout(timeout, Flux.defer(() -> {
logTimeout();
return Flux.just(new ArrayList<>());
})).onErrorResume(error -> {
logException(error);
return Flux.just(new ArrayList<>());
});
this.delegate = delegate;
}
public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate, Environment environment) {
this.serviceId = environment.getProperty(PROPERTY_NAME);
resolveTimeout(environment);
this.serviceInstances = Flux
.defer(() -> delegate.getInstances(serviceId).collectList().flux().timeout(timeout, Flux.defer(() -> {
logTimeout();
return Flux.just(new ArrayList<>());
})).onErrorResume(error -> {
logException(error);
return Flux.just(new ArrayList<>());
}));
this.delegate = delegate;
}
@Override
@ -89,9 +77,42 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan @@ -89,9 +77,42 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan
@Override
public Flux<List<ServiceInstance>> get() {
if (serviceInstances == null) {
if (delegate instanceof DiscoveryClient) {
serviceInstances = fluxFromBlockingDelegate((DiscoveryClient) delegate);
}
else if (delegate instanceof ReactiveDiscoveryClient) {
serviceInstances = fluxFromReactiveDelegate((ReactiveDiscoveryClient) delegate);
}
else {
serviceInstances = Flux.empty();
}
}
return serviceInstances;
}
private Flux<List<ServiceInstance>> fluxFromReactiveDelegate(ReactiveDiscoveryClient delegate) {
return Flux
.defer(() -> (delegate).getInstances(serviceId).collectList().flux().timeout(timeout, Flux.defer(() -> {
logTimeout();
return Flux.just(new ArrayList<>());
})).onErrorResume(error -> {
logException(error);
return Flux.just(new ArrayList<>());
}));
}
private Flux<List<ServiceInstance>> fluxFromBlockingDelegate(DiscoveryClient delegate) {
return Flux.defer(() -> Flux.just((delegate).getInstances(serviceId))).subscribeOn(Schedulers.boundedElastic())
.timeout(timeout, Flux.defer(() -> {
logTimeout();
return Flux.just(new ArrayList<>());
})).onErrorResume(error -> {
logException(error);
return Flux.just(new ArrayList<>());
});
}
private void resolveTimeout(Environment environment) {
String providedTimeout = environment.getProperty(SERVICE_DISCOVERY_TIMEOUT);
if (providedTimeout != null) {

Loading…
Cancel
Save