|
|
@ -50,36 +50,24 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan |
|
|
|
|
|
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(DiscoveryClientServiceInstanceListSupplier.class); |
|
|
|
private static final Log LOG = LogFactory.getLog(DiscoveryClientServiceInstanceListSupplier.class); |
|
|
|
|
|
|
|
|
|
|
|
private Duration timeout = Duration.ofSeconds(30); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final String serviceId; |
|
|
|
private final String serviceId; |
|
|
|
|
|
|
|
|
|
|
|
private final Flux<List<ServiceInstance>> serviceInstances; |
|
|
|
private final Object delegate; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private Duration timeout = Duration.ofSeconds(30); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private Flux<List<ServiceInstance>> serviceInstances = null; |
|
|
|
|
|
|
|
|
|
|
|
public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) { |
|
|
|
public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) { |
|
|
|
this.serviceId = environment.getProperty(PROPERTY_NAME); |
|
|
|
this.serviceId = environment.getProperty(PROPERTY_NAME); |
|
|
|
resolveTimeout(environment); |
|
|
|
resolveTimeout(environment); |
|
|
|
this.serviceInstances = Flux.defer(() -> Flux.just(delegate.getInstances(serviceId))) |
|
|
|
this.delegate = delegate; |
|
|
|
.subscribeOn(Schedulers.boundedElastic()).timeout(timeout, Flux.defer(() -> { |
|
|
|
|
|
|
|
logTimeout(); |
|
|
|
|
|
|
|
return Flux.just(new ArrayList<>()); |
|
|
|
|
|
|
|
})).onErrorResume(error -> { |
|
|
|
|
|
|
|
logException(error); |
|
|
|
|
|
|
|
return Flux.just(new ArrayList<>()); |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate, Environment environment) { |
|
|
|
public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate, Environment environment) { |
|
|
|
this.serviceId = environment.getProperty(PROPERTY_NAME); |
|
|
|
this.serviceId = environment.getProperty(PROPERTY_NAME); |
|
|
|
resolveTimeout(environment); |
|
|
|
resolveTimeout(environment); |
|
|
|
this.serviceInstances = Flux |
|
|
|
this.delegate = delegate; |
|
|
|
.defer(() -> delegate.getInstances(serviceId).collectList().flux().timeout(timeout, Flux.defer(() -> { |
|
|
|
|
|
|
|
logTimeout(); |
|
|
|
|
|
|
|
return Flux.just(new ArrayList<>()); |
|
|
|
|
|
|
|
})).onErrorResume(error -> { |
|
|
|
|
|
|
|
logException(error); |
|
|
|
|
|
|
|
return Flux.just(new ArrayList<>()); |
|
|
|
|
|
|
|
})); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
@ -89,9 +77,42 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public Flux<List<ServiceInstance>> get() { |
|
|
|
public Flux<List<ServiceInstance>> get() { |
|
|
|
|
|
|
|
if (serviceInstances == null) { |
|
|
|
|
|
|
|
if (delegate instanceof DiscoveryClient) { |
|
|
|
|
|
|
|
serviceInstances = fluxFromBlockingDelegate((DiscoveryClient) delegate); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
else if (delegate instanceof ReactiveDiscoveryClient) { |
|
|
|
|
|
|
|
serviceInstances = fluxFromReactiveDelegate((ReactiveDiscoveryClient) delegate); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
else { |
|
|
|
|
|
|
|
serviceInstances = Flux.empty(); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
return serviceInstances; |
|
|
|
return serviceInstances; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private Flux<List<ServiceInstance>> fluxFromReactiveDelegate(ReactiveDiscoveryClient delegate) { |
|
|
|
|
|
|
|
return Flux |
|
|
|
|
|
|
|
.defer(() -> (delegate).getInstances(serviceId).collectList().flux().timeout(timeout, Flux.defer(() -> { |
|
|
|
|
|
|
|
logTimeout(); |
|
|
|
|
|
|
|
return Flux.just(new ArrayList<>()); |
|
|
|
|
|
|
|
})).onErrorResume(error -> { |
|
|
|
|
|
|
|
logException(error); |
|
|
|
|
|
|
|
return Flux.just(new ArrayList<>()); |
|
|
|
|
|
|
|
})); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private Flux<List<ServiceInstance>> fluxFromBlockingDelegate(DiscoveryClient delegate) { |
|
|
|
|
|
|
|
return Flux.defer(() -> Flux.just((delegate).getInstances(serviceId))).subscribeOn(Schedulers.boundedElastic()) |
|
|
|
|
|
|
|
.timeout(timeout, Flux.defer(() -> { |
|
|
|
|
|
|
|
logTimeout(); |
|
|
|
|
|
|
|
return Flux.just(new ArrayList<>()); |
|
|
|
|
|
|
|
})).onErrorResume(error -> { |
|
|
|
|
|
|
|
logException(error); |
|
|
|
|
|
|
|
return Flux.just(new ArrayList<>()); |
|
|
|
|
|
|
|
}); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private void resolveTimeout(Environment environment) { |
|
|
|
private void resolveTimeout(Environment environment) { |
|
|
|
String providedTimeout = environment.getProperty(SERVICE_DISCOVERY_TIMEOUT); |
|
|
|
String providedTimeout = environment.getProperty(SERVICE_DISCOVERY_TIMEOUT); |
|
|
|
if (providedTimeout != null) { |
|
|
|
if (providedTimeout != null) { |
|
|
|