Browse Source

Add Reactive probe (#1201)

pull/1208/head
Olga Maciaszek-Sharma 2 years ago committed by GitHub
parent
commit
6677d16736
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/ReactiveDiscoveryClient.java
  2. 49
      spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicator.java
  3. 55
      spring-cloud-commons/src/test/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicatorTests.java
  4. 1
      src/checkstyle/checkstyle-suppressions.xml

30
spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/ReactiveDiscoveryClient.java

@ -16,7 +16,10 @@ @@ -16,7 +16,10 @@
package org.springframework.cloud.client.discovery;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.core.Ordered;
@ -26,9 +29,12 @@ import org.springframework.core.Ordered; @@ -26,9 +29,12 @@ import org.springframework.core.Ordered;
* Eureka or consul.io.
*
* @author Tim Ysewyn
* @author Olga Maciaszek-Sharma
*/
public interface ReactiveDiscoveryClient extends Ordered {
Log LOG = LogFactory.getLog(ReactiveDiscoveryClient.class);
/**
* Default order of the discovery client.
*/
@ -60,11 +66,35 @@ public interface ReactiveDiscoveryClient extends Ordered { @@ -60,11 +66,35 @@ public interface ReactiveDiscoveryClient extends Ordered {
* <p>
* The default implementation simply calls {@link #getServices()} - client
* implementations can override with a lighter weight operation if they choose to.
* @deprecated in favour of {@link ReactiveDiscoveryClient#reactiveProbe()}. This
* method should not be used as is, as it contains a bug - the method called within
* returns a {@link Flux}, which is not accessible for subscription or blocking from
* within. We are leaving it with a deprecation in order not to bring downstream
* implementations.
*/
@Deprecated
default void probe() {
if (LOG.isWarnEnabled()) {
LOG.warn("ReactiveDiscoveryClient#probe has been called. If you're calling this method directly, "
+ "use ReactiveDiscoveryClient#reactiveProbe instead.");
}
getServices();
}
/**
* Can be used to verify the client is still valid and able to make calls.
* <p>
* A successful invocation with no exception thrown implies the client is able to make
* calls.
* <p>
* The default implementation simply calls {@link #getServices()} and wraps it with a
* {@link Mono} - client implementations can override with a lighter weight operation
* if they choose to.
*/
default Mono<Void> reactiveProbe() {
return getServices().then();
}
/**
* Default implementation for getting order of discovery clients.
* @return order

49
spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicator.java

@ -33,21 +33,21 @@ import org.springframework.core.Ordered; @@ -33,21 +33,21 @@ import org.springframework.core.Ordered;
import static java.util.Collections.emptyList;
/**
* A health indicator which indicates whether or not the discovery client has been
* initialized.
* A health indicator which indicates whether the discovery client has been initialized.
*
* @author Tim Ysewyn
* @author Chris Bono
* @author Olga Maciaszek-Sharma
*/
public class ReactiveDiscoveryClientHealthIndicator
implements ReactiveDiscoveryHealthIndicator, Ordered, ApplicationListener<InstanceRegisteredEvent<?>> {
private static final Log LOG = LogFactory.getLog(ReactiveDiscoveryClientHealthIndicator.class);
private final ReactiveDiscoveryClient discoveryClient;
private final DiscoveryClientHealthIndicatorProperties properties;
private final Log log = LogFactory.getLog(ReactiveDiscoveryClientHealthIndicator.class);
private AtomicBoolean discoveryInitialized = new AtomicBoolean(false);
private int order = Ordered.HIGHEST_PRECEDENCE;
@ -60,14 +60,14 @@ public class ReactiveDiscoveryClientHealthIndicator @@ -60,14 +60,14 @@ public class ReactiveDiscoveryClientHealthIndicator
@Override
public void onApplicationEvent(InstanceRegisteredEvent<?> event) {
if (this.discoveryInitialized.compareAndSet(false, true)) {
this.log.debug("Discovery Client has been initialized");
if (discoveryInitialized.compareAndSet(false, true)) {
LOG.debug("Discovery Client has been initialized");
}
}
@Override
public Mono<Health> health() {
if (this.discoveryInitialized.get()) {
if (discoveryInitialized.get()) {
return doHealthCheck();
}
else {
@ -78,38 +78,39 @@ public class ReactiveDiscoveryClientHealthIndicator @@ -78,38 +78,39 @@ public class ReactiveDiscoveryClientHealthIndicator
private Mono<Health> doHealthCheck() {
// @formatter:off
return Mono.just(this.properties.isUseServicesQuery())
return Mono.just(properties.isUseServicesQuery())
.flatMap(useServices -> useServices ? doHealthCheckWithServices() : doHealthCheckWithProbe())
.onErrorResume(exception -> {
this.log.error("Error", exception);
if (LOG.isErrorEnabled()) {
LOG.error("Error", exception);
}
return Mono.just(Health.down().withException(exception).build());
});
// @formatter:on
}
private Mono<Health> doHealthCheckWithProbe() {
// @formatter:off
return Mono.justOrEmpty(this.discoveryClient)
.flatMap(client -> {
client.probe();
return Mono.just(client);
})
.map(client -> {
String description = (this.properties.isIncludeDescription()) ? client.description() : "";
return Health.status(new Status("UP", description)).build();
});
// @formatter:on
return discoveryClient.reactiveProbe().doOnError(exception -> {
if (LOG.isErrorEnabled()) {
LOG.error("Probe has failed.", exception);
}
}).then(buildHealthUp(discoveryClient));
}
private Mono<Health> buildHealthUp(ReactiveDiscoveryClient discoveryClient) {
String description = (properties.isIncludeDescription()) ? discoveryClient.description() : "";
return Mono.just(Health.status(new Status("UP", description)).build());
}
private Mono<Health> doHealthCheckWithServices() {
// @formatter:off
return Mono.justOrEmpty(this.discoveryClient)
return Mono.justOrEmpty(discoveryClient)
.flatMapMany(ReactiveDiscoveryClient::getServices)
.collectList()
.defaultIfEmpty(emptyList())
.map(services -> {
String description = (this.properties.isIncludeDescription()) ?
this.discoveryClient.description() : "";
String description = (properties.isIncludeDescription()) ?
discoveryClient.description() : "";
return Health.status(new Status("UP", description))
.withDetail("services", services).build();
});
@ -123,7 +124,7 @@ public class ReactiveDiscoveryClientHealthIndicator @@ -123,7 +124,7 @@ public class ReactiveDiscoveryClientHealthIndicator
@Override
public int getOrder() {
return this.order;
return order;
}
public void setOrder(int order) {

55
spring-cloud-commons/src/test/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicatorTests.java

@ -27,6 +27,8 @@ import reactor.test.StepVerifier; @@ -27,6 +27,8 @@ import reactor.test.StepVerifier;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.cloud.client.discovery.event.InstanceRegisteredEvent;
import org.springframework.cloud.client.discovery.health.DiscoveryClientHealthIndicatorProperties;
@ -35,12 +37,12 @@ import org.springframework.core.Ordered; @@ -35,12 +37,12 @@ import org.springframework.core.Ordered;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
/**
* @author Tim Ysewyn
* @author Chris Bono
* @author Olga Maciaszek-Sharma
*/
@ExtendWith(MockitoExtension.class)
class ReactiveDiscoveryClientHealthIndicatorTests {
@ -78,6 +80,9 @@ class ReactiveDiscoveryClientHealthIndicatorTests { @@ -78,6 +80,9 @@ class ReactiveDiscoveryClientHealthIndicatorTests {
@Test
public void shouldReturnUpStatusWhenNotUsingServicesQueryAndProbeSucceeds() {
when(properties.isUseServicesQuery()).thenReturn(false);
ReactiveDiscoveryClient discoveryClient = new TestDiscoveryClient();
ReactiveDiscoveryClientHealthIndicator indicator = new ReactiveDiscoveryClientHealthIndicator(discoveryClient,
properties);
Health expectedHealth = Health.status(new Status(Status.UP.getCode(), "")).build();
indicator.onApplicationEvent(new InstanceRegisteredEvent<>(this, null));
@ -88,10 +93,10 @@ class ReactiveDiscoveryClientHealthIndicatorTests { @@ -88,10 +93,10 @@ class ReactiveDiscoveryClientHealthIndicatorTests {
@Test
public void shouldReturnDownStatusWhenNotUsingServicesQueryAndProbeFails() {
when(properties.isUseServicesQuery()).thenReturn(false);
RuntimeException ex = new RuntimeException("something went wrong");
doThrow(ex).when(discoveryClient).probe();
Health expectedHealth = Health.down(ex).build();
ExceptionThrowingDiscoveryClient discoveryClient = new ExceptionThrowingDiscoveryClient();
ReactiveDiscoveryClientHealthIndicator indicator = new ReactiveDiscoveryClientHealthIndicator(discoveryClient,
properties);
Health expectedHealth = Health.down(discoveryClient.exception).build();
indicator.onApplicationEvent(new InstanceRegisteredEvent<>(this, null));
Mono<Health> health = indicator.health();
@ -140,4 +145,44 @@ class ReactiveDiscoveryClientHealthIndicatorTests { @@ -140,4 +145,44 @@ class ReactiveDiscoveryClientHealthIndicatorTests {
StepVerifier.create(health).expectNext(expectedHealth).expectComplete().verify();
}
static class TestDiscoveryClient implements ReactiveDiscoveryClient {
@Override
public String description() {
return "Test";
}
@Override
public Flux<ServiceInstance> getInstances(String serviceId) {
return Flux.just(new DefaultServiceInstance());
}
@Override
public Flux<String> getServices() {
return Flux.just("Test");
}
}
static class ExceptionThrowingDiscoveryClient implements ReactiveDiscoveryClient {
RuntimeException exception = new RuntimeException("something went wrong");
@Override
public String description() {
return "Exception";
}
@Override
public Flux<ServiceInstance> getInstances(String serviceId) {
throw new RuntimeException("Test!");
}
@Override
public Flux<String> getServices() {
throw new RuntimeException("something went wrong");
}
}
}

1
src/checkstyle/checkstyle-suppressions.xml

@ -17,4 +17,5 @@ @@ -17,4 +17,5 @@
<suppress files=".*RefreshScopeConfigurationTests.*" checks="JavadocStyle"/>
<suppress files=".*RefreshScopeConfigurationTests.*" checks="JavadocMethod"/>
<suppress files=".*CachingServiceInstanceListSupplierTests.*" checks="RegexpSinglelineJava"/>
<suppress files=".*ReactiveDiscoveryClient.*" checks="JavadocVariable"/>
</suppressions>

Loading…
Cancel
Save