@ -50,36 +50,24 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan
@@ -50,36 +50,24 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan
private static final Log LOG = LogFactory . getLog ( DiscoveryClientServiceInstanceListSupplier . class ) ;
private Duration timeout = Duration . ofSeconds ( 30 ) ;
private final String serviceId ;
private final Flux < List < ServiceInstance > > serviceInstances ;
private final Object delegate ;
private Duration timeout = Duration . ofSeconds ( 30 ) ;
private Flux < List < ServiceInstance > > serviceInstances = null ;
public DiscoveryClientServiceInstanceListSupplier ( DiscoveryClient delegate , Environment environment ) {
this . serviceId = environment . getProperty ( PROPERTY_NAME ) ;
resolveTimeout ( environment ) ;
this . serviceInstances = Flux . defer ( ( ) - > Flux . just ( delegate . getInstances ( serviceId ) ) )
. subscribeOn ( Schedulers . boundedElastic ( ) ) . timeout ( timeout , Flux . defer ( ( ) - > {
logTimeout ( ) ;
return Flux . just ( new ArrayList < > ( ) ) ;
} ) ) . onErrorResume ( error - > {
logException ( error ) ;
return Flux . just ( new ArrayList < > ( ) ) ;
} ) ;
this . delegate = delegate ;
}
public DiscoveryClientServiceInstanceListSupplier ( ReactiveDiscoveryClient delegate , Environment environment ) {
this . serviceId = environment . getProperty ( PROPERTY_NAME ) ;
resolveTimeout ( environment ) ;
this . serviceInstances = Flux
. defer ( ( ) - > delegate . getInstances ( serviceId ) . collectList ( ) . flux ( ) . timeout ( timeout , Flux . defer ( ( ) - > {
logTimeout ( ) ;
return Flux . just ( new ArrayList < > ( ) ) ;
} ) ) . onErrorResume ( error - > {
logException ( error ) ;
return Flux . just ( new ArrayList < > ( ) ) ;
} ) ) ;
this . delegate = delegate ;
}
@Override
@ -89,9 +77,42 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan
@@ -89,9 +77,42 @@ public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstan
@Override
public Flux < List < ServiceInstance > > get ( ) {
if ( serviceInstances = = null ) {
if ( delegate instanceof DiscoveryClient ) {
serviceInstances = fluxFromBlockingDelegate ( ( DiscoveryClient ) delegate ) ;
}
else if ( delegate instanceof ReactiveDiscoveryClient ) {
serviceInstances = fluxFromReactiveDelegate ( ( ReactiveDiscoveryClient ) delegate ) ;
}
else {
serviceInstances = Flux . empty ( ) ;
}
}
return serviceInstances ;
}
private Flux < List < ServiceInstance > > fluxFromReactiveDelegate ( ReactiveDiscoveryClient delegate ) {
return Flux
. defer ( ( ) - > ( delegate ) . getInstances ( serviceId ) . collectList ( ) . flux ( ) . timeout ( timeout , Flux . defer ( ( ) - > {
logTimeout ( ) ;
return Flux . just ( new ArrayList < > ( ) ) ;
} ) ) . onErrorResume ( error - > {
logException ( error ) ;
return Flux . just ( new ArrayList < > ( ) ) ;
} ) ) ;
}
private Flux < List < ServiceInstance > > fluxFromBlockingDelegate ( DiscoveryClient delegate ) {
return Flux . defer ( ( ) - > Flux . just ( ( delegate ) . getInstances ( serviceId ) ) ) . subscribeOn ( Schedulers . boundedElastic ( ) )
. timeout ( timeout , Flux . defer ( ( ) - > {
logTimeout ( ) ;
return Flux . just ( new ArrayList < > ( ) ) ;
} ) ) . onErrorResume ( error - > {
logException ( error ) ;
return Flux . just ( new ArrayList < > ( ) ) ;
} ) ;
}
private void resolveTimeout ( Environment environment ) {
String providedTimeout = environment . getProperty ( SERVICE_DISCOVERY_TIMEOUT ) ;
if ( providedTimeout ! = null ) {