diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/ReactiveDiscoveryClient.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/ReactiveDiscoveryClient.java
index 0cd1ce3b..5ef5b333 100644
--- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/ReactiveDiscoveryClient.java
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/ReactiveDiscoveryClient.java
@@ -16,7 +16,10 @@
package org.springframework.cloud.client.discovery;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.core.Ordered;
@@ -26,9 +29,12 @@ import org.springframework.core.Ordered;
* Eureka or consul.io.
*
* @author Tim Ysewyn
+ * @author Olga Maciaszek-Sharma
*/
public interface ReactiveDiscoveryClient extends Ordered {
+ Log LOG = LogFactory.getLog(ReactiveDiscoveryClient.class);
+
/**
* Default order of the discovery client.
*/
@@ -60,11 +66,35 @@ public interface ReactiveDiscoveryClient extends Ordered {
*
* The default implementation simply calls {@link #getServices()} - client
* implementations can override with a lighter weight operation if they choose to.
+ * @deprecated in favour of {@link ReactiveDiscoveryClient#reactiveProbe()}. This
+ * method should not be used as is, as it contains a bug - the method called within
+ * returns a {@link Flux}, which is not accessible for subscription or blocking from
+ * within. We are leaving it with a deprecation in order not to bring downstream
+ * implementations.
*/
+ @Deprecated
default void probe() {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("ReactiveDiscoveryClient#probe has been called. If you're calling this method directly, "
+ + "use ReactiveDiscoveryClient#reactiveProbe instead.");
+ }
getServices();
}
+ /**
+ * Can be used to verify the client is still valid and able to make calls.
+ *
+ * A successful invocation with no exception thrown implies the client is able to make
+ * calls.
+ *
+ * The default implementation simply calls {@link #getServices()} and wraps it with a
+ * {@link Mono} - client implementations can override with a lighter weight operation
+ * if they choose to.
+ */
+ default Mono reactiveProbe() {
+ return getServices().then();
+ }
+
/**
* Default implementation for getting order of discovery clients.
* @return order
diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicator.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicator.java
index d49a1412..13111cd8 100644
--- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicator.java
+++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicator.java
@@ -33,21 +33,21 @@ import org.springframework.core.Ordered;
import static java.util.Collections.emptyList;
/**
- * A health indicator which indicates whether or not the discovery client has been
- * initialized.
+ * A health indicator which indicates whether the discovery client has been initialized.
*
* @author Tim Ysewyn
* @author Chris Bono
+ * @author Olga Maciaszek-Sharma
*/
public class ReactiveDiscoveryClientHealthIndicator
implements ReactiveDiscoveryHealthIndicator, Ordered, ApplicationListener> {
+ private static final Log LOG = LogFactory.getLog(ReactiveDiscoveryClientHealthIndicator.class);
+
private final ReactiveDiscoveryClient discoveryClient;
private final DiscoveryClientHealthIndicatorProperties properties;
- private final Log log = LogFactory.getLog(ReactiveDiscoveryClientHealthIndicator.class);
-
private AtomicBoolean discoveryInitialized = new AtomicBoolean(false);
private int order = Ordered.HIGHEST_PRECEDENCE;
@@ -60,14 +60,14 @@ public class ReactiveDiscoveryClientHealthIndicator
@Override
public void onApplicationEvent(InstanceRegisteredEvent> event) {
- if (this.discoveryInitialized.compareAndSet(false, true)) {
- this.log.debug("Discovery Client has been initialized");
+ if (discoveryInitialized.compareAndSet(false, true)) {
+ LOG.debug("Discovery Client has been initialized");
}
}
@Override
public Mono health() {
- if (this.discoveryInitialized.get()) {
+ if (discoveryInitialized.get()) {
return doHealthCheck();
}
else {
@@ -78,38 +78,39 @@ public class ReactiveDiscoveryClientHealthIndicator
private Mono doHealthCheck() {
// @formatter:off
- return Mono.just(this.properties.isUseServicesQuery())
+ return Mono.just(properties.isUseServicesQuery())
.flatMap(useServices -> useServices ? doHealthCheckWithServices() : doHealthCheckWithProbe())
.onErrorResume(exception -> {
- this.log.error("Error", exception);
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Error", exception);
+ }
return Mono.just(Health.down().withException(exception).build());
});
// @formatter:on
}
private Mono doHealthCheckWithProbe() {
- // @formatter:off
- return Mono.justOrEmpty(this.discoveryClient)
- .flatMap(client -> {
- client.probe();
- return Mono.just(client);
- })
- .map(client -> {
- String description = (this.properties.isIncludeDescription()) ? client.description() : "";
- return Health.status(new Status("UP", description)).build();
- });
- // @formatter:on
+ return discoveryClient.reactiveProbe().doOnError(exception -> {
+ if (LOG.isErrorEnabled()) {
+ LOG.error("Probe has failed.", exception);
+ }
+ }).then(buildHealthUp(discoveryClient));
+ }
+
+ private Mono buildHealthUp(ReactiveDiscoveryClient discoveryClient) {
+ String description = (properties.isIncludeDescription()) ? discoveryClient.description() : "";
+ return Mono.just(Health.status(new Status("UP", description)).build());
}
private Mono doHealthCheckWithServices() {
// @formatter:off
- return Mono.justOrEmpty(this.discoveryClient)
+ return Mono.justOrEmpty(discoveryClient)
.flatMapMany(ReactiveDiscoveryClient::getServices)
.collectList()
.defaultIfEmpty(emptyList())
.map(services -> {
- String description = (this.properties.isIncludeDescription()) ?
- this.discoveryClient.description() : "";
+ String description = (properties.isIncludeDescription()) ?
+ discoveryClient.description() : "";
return Health.status(new Status("UP", description))
.withDetail("services", services).build();
});
@@ -123,7 +124,7 @@ public class ReactiveDiscoveryClientHealthIndicator
@Override
public int getOrder() {
- return this.order;
+ return order;
}
public void setOrder(int order) {
diff --git a/spring-cloud-commons/src/test/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicatorTests.java b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicatorTests.java
index dc48098f..0da5afdb 100644
--- a/spring-cloud-commons/src/test/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicatorTests.java
+++ b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicatorTests.java
@@ -27,6 +27,8 @@ import reactor.test.StepVerifier;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
+import org.springframework.cloud.client.DefaultServiceInstance;
+import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.cloud.client.discovery.event.InstanceRegisteredEvent;
import org.springframework.cloud.client.discovery.health.DiscoveryClientHealthIndicatorProperties;
@@ -35,12 +37,12 @@ import org.springframework.core.Ordered;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;
/**
* @author Tim Ysewyn
* @author Chris Bono
+ * @author Olga Maciaszek-Sharma
*/
@ExtendWith(MockitoExtension.class)
class ReactiveDiscoveryClientHealthIndicatorTests {
@@ -78,6 +80,9 @@ class ReactiveDiscoveryClientHealthIndicatorTests {
@Test
public void shouldReturnUpStatusWhenNotUsingServicesQueryAndProbeSucceeds() {
when(properties.isUseServicesQuery()).thenReturn(false);
+ ReactiveDiscoveryClient discoveryClient = new TestDiscoveryClient();
+ ReactiveDiscoveryClientHealthIndicator indicator = new ReactiveDiscoveryClientHealthIndicator(discoveryClient,
+ properties);
Health expectedHealth = Health.status(new Status(Status.UP.getCode(), "")).build();
indicator.onApplicationEvent(new InstanceRegisteredEvent<>(this, null));
@@ -88,10 +93,10 @@ class ReactiveDiscoveryClientHealthIndicatorTests {
@Test
public void shouldReturnDownStatusWhenNotUsingServicesQueryAndProbeFails() {
- when(properties.isUseServicesQuery()).thenReturn(false);
- RuntimeException ex = new RuntimeException("something went wrong");
- doThrow(ex).when(discoveryClient).probe();
- Health expectedHealth = Health.down(ex).build();
+ ExceptionThrowingDiscoveryClient discoveryClient = new ExceptionThrowingDiscoveryClient();
+ ReactiveDiscoveryClientHealthIndicator indicator = new ReactiveDiscoveryClientHealthIndicator(discoveryClient,
+ properties);
+ Health expectedHealth = Health.down(discoveryClient.exception).build();
indicator.onApplicationEvent(new InstanceRegisteredEvent<>(this, null));
Mono health = indicator.health();
@@ -140,4 +145,44 @@ class ReactiveDiscoveryClientHealthIndicatorTests {
StepVerifier.create(health).expectNext(expectedHealth).expectComplete().verify();
}
+ static class TestDiscoveryClient implements ReactiveDiscoveryClient {
+
+ @Override
+ public String description() {
+ return "Test";
+ }
+
+ @Override
+ public Flux getInstances(String serviceId) {
+ return Flux.just(new DefaultServiceInstance());
+ }
+
+ @Override
+ public Flux getServices() {
+ return Flux.just("Test");
+ }
+
+ }
+
+ static class ExceptionThrowingDiscoveryClient implements ReactiveDiscoveryClient {
+
+ RuntimeException exception = new RuntimeException("something went wrong");
+
+ @Override
+ public String description() {
+ return "Exception";
+ }
+
+ @Override
+ public Flux getInstances(String serviceId) {
+ throw new RuntimeException("Test!");
+ }
+
+ @Override
+ public Flux getServices() {
+ throw new RuntimeException("something went wrong");
+ }
+
+ }
+
}
diff --git a/src/checkstyle/checkstyle-suppressions.xml b/src/checkstyle/checkstyle-suppressions.xml
index 9a855ed3..d8a81608 100644
--- a/src/checkstyle/checkstyle-suppressions.xml
+++ b/src/checkstyle/checkstyle-suppressions.xml
@@ -17,4 +17,5 @@
+