diff --git a/spring-cloud-loadbalancer/pom.xml b/spring-cloud-loadbalancer/pom.xml
index cc671baf..a87a1e9d 100644
--- a/spring-cloud-loadbalancer/pom.xml
+++ b/spring-cloud-loadbalancer/pom.xml
@@ -88,5 +88,10 @@
reactor-test
test
+
+ org.awaitility
+ awaitility
+ test
+
diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java
index 0374a00d..86efcaae 100644
--- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java
+++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java
@@ -16,18 +16,19 @@
package org.springframework.cloud.loadbalancer.core;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import reactor.core.Disposable;
import reactor.core.publisher.Flux;
-import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
-import reactor.core.scheduler.Schedulers;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerProperties;
import org.springframework.http.HttpStatus;
@@ -40,10 +41,11 @@ import org.springframework.web.util.UriComponentsBuilder;
* {@link WebClient} to ping the health
endpoint of the instances.
*
* @author Olga Maciaszek-Sharma
+ * @author Roman Matiushchenko
* @since 2.2.0
*/
public class HealthCheckServiceInstanceListSupplier
- implements ServiceInstanceListSupplier {
+ implements ServiceInstanceListSupplier, InitializingBean, DisposableBean {
private static final Log LOG = LogFactory
.getLog(HealthCheckServiceInstanceListSupplier.class);
@@ -56,50 +58,72 @@ public class HealthCheckServiceInstanceListSupplier
private final String defaultHealthCheckPath;
- private List instances = Collections
- .synchronizedList(new ArrayList<>());
+ private final Flux> aliveInstancesReplay;
- private List healthyInstances = Collections
- .synchronizedList(new ArrayList<>());
+ private Disposable healthCheckDisposable;
public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) {
this.delegate = delegate;
this.healthCheck = healthCheck;
- defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default",
+ this.defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default",
"/actuator/health");
this.webClient = webClient;
- initInstances();
-
+ this.aliveInstancesReplay = Flux.defer(delegate)
+ .delaySubscription(Duration.ofMillis(this.healthCheck.getInitialDelay()))
+ .switchMap(serviceInstances -> healthCheckFlux(serviceInstances)
+ .map(alive -> Collections.unmodifiableList(new ArrayList<>(alive)))
+ )
+ .replay(1)
+ .refCount(1);
}
- private void initInstances() {
- delegate.get().subscribe(delegateInstances -> {
- instances.clear();
- instances.addAll(delegateInstances);
- });
-
- Flux> healthCheckFlux = healthCheckFlux();
-
- healthCheckFlux.subscribe(verifiedInstances -> {
- healthyInstances.clear();
- healthyInstances.addAll(verifiedInstances);
- });
+ @Override
+ public void afterPropertiesSet() {
+ Disposable healthCheckDisposable = this.healthCheckDisposable;
+ if (healthCheckDisposable != null) {
+ healthCheckDisposable.dispose();
+ }
+ this.healthCheckDisposable = aliveInstancesReplay.subscribe();
}
- protected Flux> healthCheckFlux() {
- return Flux.create(emitter -> Schedulers
- .newSingle("Health Check Verifier: " + getServiceId(), true)
- .schedulePeriodically(() -> {
- List verifiedInstances = new ArrayList<>();
- Flux.fromIterable(instances).filterWhen(this::isAlive)
- .subscribe(serviceInstance -> {
- verifiedInstances.add(serviceInstance);
- emitter.next(verifiedInstances);
- });
- }, healthCheck.getInitialDelay(), healthCheck.getInterval().toMillis(),
- TimeUnit.MILLISECONDS),
- FluxSink.OverflowStrategy.LATEST);
+ protected Flux> healthCheckFlux(List instances) {
+ return Flux.defer(() -> {
+ List> checks = new ArrayList<>(instances.size());
+ for (ServiceInstance instance : instances) {
+ Mono alive = isAlive(instance).onErrorResume(error -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "Exception occurred during health check of the instance for service %s: %s",
+ instance.getServiceId(), instance.getUri()), error);
+ }
+ return Mono.empty();
+ })
+ .timeout(this.healthCheck.getInterval(), Mono.defer(() -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format(
+ "The instance for service %s: %s did not respond for %s during health check",
+ instance.getServiceId(), instance.getUri(),
+ this.healthCheck.getInterval()));
+ }
+ return Mono.empty();
+ }))
+ .handle((isHealthy, sink) -> {
+ if (isHealthy) {
+ sink.next(instance);
+ }
+ });
+
+ checks.add(alive);
+ }
+ List result = new ArrayList<>();
+ return Flux.merge(checks).map(alive -> {
+ result.add(alive);
+ return result;
+ })
+ .defaultIfEmpty(result);
+ })
+ .repeatWhen(restart -> restart.delayElements(this.healthCheck.getInterval()));
}
@Override
@@ -109,16 +133,7 @@ public class HealthCheckServiceInstanceListSupplier
@Override
public Flux> get() {
- if (!healthyInstances.isEmpty()) {
- return Flux.defer(() -> Flux.fromIterable(healthyInstances).collectList());
- }
- // If there are no healthy instances, it might be better to still retry on all of
- // them
- if (LOG.isWarnEnabled()) {
- LOG.warn(
- "No verified healthy instances were found, returning all listed instances.");
- }
- return Flux.defer(() -> Flux.fromIterable(instances).collectList());
+ return aliveInstancesReplay;
}
protected Mono isAlive(ServiceInstance serviceInstance) {
@@ -130,7 +145,17 @@ public class HealthCheckServiceInstanceListSupplier
.uri(UriComponentsBuilder.fromUri(serviceInstance.getUri())
.path(healthCheckPath).build().toUri())
.exchange()
- .map(clientResponse -> HttpStatus.OK.equals(clientResponse.statusCode()));
+ .flatMap(clientResponse -> clientResponse.releaseBody()
+ .thenReturn(HttpStatus.OK.value() == clientResponse.rawStatusCode())
+ );
+ }
+
+ @Override
+ public void destroy() {
+ Disposable healthCheckDisposable = this.healthCheckDisposable;
+ if (healthCheckDisposable != null) {
+ healthCheckDisposable.dispose();
+ }
}
}
diff --git a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java
index 8f6961a8..7d85ccb1 100644
--- a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java
+++ b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java
@@ -16,8 +16,21 @@
package org.springframework.cloud.loadbalancer.core;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.assertj.core.api.Assertions;
+import org.assertj.core.util.Lists;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -39,6 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* Tests for {@link HealthCheckServiceInstanceListSupplier}.
*
* @author Olga Maciaszek-Sharma
+ * @author Roman Matiushchenko
*/
@ExtendWith(SpringExtension.class)
@SpringBootTest(
@@ -46,18 +60,37 @@ import static org.assertj.core.api.Assertions.assertThat;
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class HealthCheckServiceInstanceListSupplierTests {
+ private static final String SERVICE_ID = "ignored-service";
+
+ private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(5);
+
@LocalServerPort
private int port;
private final WebClient webClient = WebClient.create();
- private LoadBalancerProperties.HealthCheck healthCheck = new LoadBalancerProperties.HealthCheck();
+ private LoadBalancerProperties.HealthCheck healthCheck;
+
+ private HealthCheckServiceInstanceListSupplier listSupplier;
+
+ @BeforeEach
+ void setUp() {
+ healthCheck = new LoadBalancerProperties.HealthCheck();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ if (listSupplier != null) {
+ listSupplier.destroy();
+ listSupplier = null;
+ }
+ }
@SuppressWarnings("ConstantConditions")
@Test
void shouldCheckInstanceWithProvidedHealthCheckPath() {
healthCheck.getPath().put("ignored-service", "/health");
- HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier(
+ listSupplier = new HealthCheckServiceInstanceListSupplier(
ServiceInstanceListSupplier.FixedServiceInstanceListSupplier
.with(new MockEnvironment()).build(),
healthCheck, webClient);
@@ -72,7 +105,7 @@ class HealthCheckServiceInstanceListSupplierTests {
@SuppressWarnings("ConstantConditions")
@Test
void shouldCheckInstanceWithDefaultHealthCheckPath() {
- HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier(
+ listSupplier = new HealthCheckServiceInstanceListSupplier(
ServiceInstanceListSupplier.FixedServiceInstanceListSupplier
.with(new MockEnvironment()).build(),
healthCheck, webClient);
@@ -88,7 +121,7 @@ class HealthCheckServiceInstanceListSupplierTests {
@Test
void shouldReturnFalseIfEndpointNotFound() {
healthCheck.getPath().put("ignored-service", "/test");
- HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier(
+ listSupplier = new HealthCheckServiceInstanceListSupplier(
ServiceInstanceListSupplier.FixedServiceInstanceListSupplier
.with(new MockEnvironment()).build(),
healthCheck, webClient);
@@ -100,6 +133,369 @@ class HealthCheckServiceInstanceListSupplierTests {
assertThat(alive).isFalse();
}
+ @Test
+ void shouldReturnOnlyAliveService() {
+ healthCheck.setInitialDelay(1000);
+
+ ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
+ "127.0.0.1", port, false);
+ ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID,
+ "127.0.0.2", port, false);
+
+ StepVerifier.withVirtualTime(() -> {
+ ServiceInstanceListSupplier delegate = Mockito
+ .mock(ServiceInstanceListSupplier.class);
+ Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
+ Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2)));
+
+ HealthCheckServiceInstanceListSupplier mock = Mockito
+ .mock(HealthCheckServiceInstanceListSupplier.class);
+ Mockito.doReturn(Mono.just(true)).when(mock).isAlive(si1);
+ Mockito.doReturn(Mono.just(false)).when(mock).isAlive(si2);
+
+ listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
+ healthCheck, webClient) {
+ @Override
+ protected Mono isAlive(ServiceInstance serviceInstance) {
+ return mock.isAlive(serviceInstance);
+ }
+ };
+
+ return listSupplier.get();
+ })
+ .expectSubscription()
+ .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
+ .expectNext(Lists.list(si1))
+ .expectNoEvent(healthCheck.getInterval())
+ .thenCancel()
+ .verify(VERIFY_TIMEOUT);
+ }
+
+ @Test
+ void shouldEmitOnEachAliveServiceInBatch() {
+ healthCheck.setInitialDelay(1000);
+ ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
+ "127.0.0.1", port, false);
+ ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID,
+ "127.0.0.2", port, false);
+
+ StepVerifier.withVirtualTime(() -> {
+ ServiceInstanceListSupplier delegate = Mockito
+ .mock(ServiceInstanceListSupplier.class);
+ Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
+ Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2)));
+
+ HealthCheckServiceInstanceListSupplier mock = Mockito
+ .mock(HealthCheckServiceInstanceListSupplier.class);
+ Mockito.doReturn(Mono.just(true)).when(mock).isAlive(si1);
+ Mockito.doReturn(Mono.just(true)).when(mock).isAlive(si2);
+
+ listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
+ healthCheck, webClient) {
+ @Override
+ protected Mono isAlive(ServiceInstance serviceInstance) {
+ return mock.isAlive(serviceInstance);
+ }
+ };
+
+ return listSupplier.get();
+ })
+ .expectSubscription()
+ .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
+ .expectNext(Lists.list(si1))
+ .expectNext(Lists.list(si1, si2))
+ .expectNoEvent(healthCheck.getInterval())
+ .thenCancel()
+ .verify(VERIFY_TIMEOUT);
+ }
+
+ @Test
+ void shouldNotFailIfIsAliveReturnsError() {
+ healthCheck.setInitialDelay(1000);
+ ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
+ "127.0.0.1", port, false);
+ ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID,
+ "127.0.0.2", port, false);
+
+ StepVerifier.withVirtualTime(() -> {
+ ServiceInstanceListSupplier delegate = Mockito
+ .mock(ServiceInstanceListSupplier.class);
+ Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
+ Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2)));
+
+ HealthCheckServiceInstanceListSupplier mock = Mockito
+ .mock(HealthCheckServiceInstanceListSupplier.class);
+ Mockito.doReturn(Mono.just(true)).when(mock).isAlive(si1);
+ Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(mock)
+ .isAlive(si2);
+
+ listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
+ healthCheck, webClient) {
+ @Override
+ protected Mono isAlive(ServiceInstance serviceInstance) {
+ return mock.isAlive(serviceInstance);
+ }
+ };
+
+ return listSupplier.get();
+ })
+ .expectSubscription()
+ .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
+ .expectNext(Lists.list(si1))
+ .expectNoEvent(healthCheck.getInterval())
+ .thenCancel()
+ .verify(VERIFY_TIMEOUT);
+ }
+
+ @Test
+ void shouldEmitAllInstancesIfAllIsAliveChecksFailed() {
+ healthCheck.setInitialDelay(1000);
+ ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
+ "127.0.0.1", port, false);
+ ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID,
+ "127.0.0.2", port, false);
+
+ StepVerifier.withVirtualTime(() -> {
+ ServiceInstanceListSupplier delegate = Mockito
+ .mock(ServiceInstanceListSupplier.class);
+ Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
+ Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2)));
+ listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
+ healthCheck, webClient) {
+ @Override
+ protected Mono isAlive(ServiceInstance serviceInstance) {
+ if (serviceInstance == si1) {
+ return Mono.just(false);
+ }
+ else {
+ return Mono.error(new RuntimeException("boom"));
+ }
+ }
+ };
+
+ return listSupplier.get();
+ })
+ .expectSubscription()
+ .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
+ .expectNext(Lists.list())
+ .expectNoEvent(healthCheck.getInterval())
+ .thenCancel()
+ .verify(VERIFY_TIMEOUT);
+ }
+
+ @Test
+ void shouldMakeInitialDaleyAfterPropertiesSet() {
+ healthCheck.setInitialDelay(1000);
+ ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
+ "127.0.0.1", port, false);
+
+ StepVerifier.withVirtualTime(() -> {
+ ServiceInstanceListSupplier delegate = Mockito
+ .mock(ServiceInstanceListSupplier.class);
+ Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
+ Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1)));
+ listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
+ healthCheck, webClient) {
+ @Override
+ protected Mono isAlive(ServiceInstance serviceInstance) {
+ return Mono.just(true);
+ }
+ };
+
+ listSupplier.afterPropertiesSet();
+
+ return listSupplier.get();
+ })
+ .expectSubscription()
+ .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
+ .expectNext(Lists.list(si1))
+ .expectNoEvent(healthCheck.getInterval())
+ .thenCancel()
+ .verify(VERIFY_TIMEOUT);
+ }
+
+ @Test
+ void shouldRepeatIsAliveChecksIndefinitely() {
+ healthCheck.setInitialDelay(1000);
+ ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
+ "127.0.0.1", port, false);
+ ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID,
+ "127.0.0.2", port, false);
+
+ StepVerifier.withVirtualTime(() -> {
+ ServiceInstanceListSupplier delegate = Mockito
+ .mock(ServiceInstanceListSupplier.class);
+ Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
+ Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2)));
+
+ HealthCheckServiceInstanceListSupplier mock = Mockito
+ .mock(HealthCheckServiceInstanceListSupplier.class);
+ Mockito.doReturn(Mono.just(false), Mono.just(true)).when(mock).isAlive(si1);
+ Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(mock)
+ .isAlive(si2);
+
+ listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
+ healthCheck, webClient) {
+ @Override
+ protected Mono isAlive(ServiceInstance serviceInstance) {
+ return mock.isAlive(serviceInstance);
+ }
+ };
+
+ return listSupplier.get();
+ })
+ .expectSubscription()
+ .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
+ .expectNext(Lists.list())
+ .expectNoEvent(healthCheck.getInterval())
+ .expectNext(Lists.list(si1))
+ .expectNoEvent(healthCheck.getInterval())
+ .expectNext(Lists.list(si1))
+ .thenCancel().verify(VERIFY_TIMEOUT);
+ }
+
+ @Test
+ void shouldTimeoutIsAliveCheck() {
+ healthCheck.setInitialDelay(1000);
+ ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
+ "127.0.0.1", port, false);
+
+ StepVerifier.withVirtualTime(() -> {
+ ServiceInstanceListSupplier delegate = Mockito
+ .mock(ServiceInstanceListSupplier.class);
+ Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
+ Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1)));
+
+ HealthCheckServiceInstanceListSupplier mock = Mockito
+ .mock(HealthCheckServiceInstanceListSupplier.class);
+ Mockito.when(mock.isAlive(si1)).thenReturn(Mono.never(), Mono.just(true));
+
+ listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
+ healthCheck, webClient) {
+ @Override
+ protected Mono isAlive(ServiceInstance serviceInstance) {
+ return mock.isAlive(serviceInstance);
+ }
+ };
+
+ return listSupplier.get();
+ })
+ .expectSubscription()
+ .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
+ .expectNoEvent(healthCheck.getInterval())
+ .expectNext(Lists.list())
+ .expectNoEvent(healthCheck.getInterval())
+ .expectNext(Lists.list(si1))
+ .expectNoEvent(healthCheck.getInterval())
+ .expectNext(Lists.list(si1))
+ .thenCancel().verify(VERIFY_TIMEOUT);
+ }
+
+ @Test
+ void shouldUpdateInstances() {
+ healthCheck.setInitialDelay(1000);
+ ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
+ "127.0.0.1", port, false);
+ ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID,
+ "127.0.0.2", port, false);
+
+ StepVerifier.withVirtualTime(() -> {
+ ServiceInstanceListSupplier delegate = Mockito
+ .mock(ServiceInstanceListSupplier.class);
+ Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
+ Flux> instances = Flux.just(Lists.list(si1))
+ .concatWith(Flux.just(Lists.list(si1, si2))
+ .delayElements(healthCheck.getInterval().dividedBy(2)));
+ Mockito.when(delegate.get()).thenReturn(instances);
+
+ listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
+ healthCheck, webClient) {
+ @Override
+ protected Mono isAlive(ServiceInstance serviceInstance) {
+ return Mono.just(true);
+ }
+ };
+
+ return listSupplier.get();
+ })
+ .expectSubscription()
+ .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
+ .expectNext(Lists.list(si1))
+ .thenAwait(healthCheck.getInterval().dividedBy(2))
+ .expectNext(Lists.list(si1))
+ .expectNext(Lists.list(si1, si2))
+ .expectNoEvent(healthCheck.getInterval())
+ .expectNext(Lists.list(si1))
+ .expectNext(Lists.list(si1, si2))
+ .thenCancel()
+ .verify(VERIFY_TIMEOUT);
+ }
+
+ @Test
+ void shouldCacheResultIfAfterPropertiesSetInvoked() {
+ healthCheck.setInitialDelay(1000);
+ ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
+ "127.0.0.1", port, false);
+
+ AtomicInteger emitCounter = new AtomicInteger();
+
+ StepVerifier.withVirtualTime(() -> {
+ ServiceInstanceListSupplier delegate = Mockito
+ .mock(ServiceInstanceListSupplier.class);
+ Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
+ Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1)));
+
+ listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
+ healthCheck, webClient) {
+ @Override
+ protected Mono isAlive(ServiceInstance serviceInstance) {
+ return Mono.just(true);
+ }
+
+ @Override
+ protected Flux> healthCheckFlux(List instances) {
+ return super.healthCheckFlux(instances).doOnNext(it -> emitCounter.incrementAndGet());
+ }
+ };
+
+ listSupplier.afterPropertiesSet();
+
+ return listSupplier.get().take(1).concatWith(listSupplier.get().take(1));
+ })
+ .expectSubscription()
+ .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
+ .expectNext(Lists.list(si1))
+ .expectNext(Lists.list(si1))
+ .thenCancel()
+ .verify(VERIFY_TIMEOUT);
+
+ Assertions.assertThat(emitCounter).hasValue(1);
+ }
+
+ @Test
+ void shouldCancelSubscription() {
+
+ final AtomicInteger instancesCanceled = new AtomicInteger();
+
+ ServiceInstanceListSupplier delegate = Mockito
+ .mock(ServiceInstanceListSupplier.class);
+ Mockito.when(delegate.get())
+ .thenReturn(Flux.>never()
+ .log("test")
+ .doOnCancel(instancesCanceled::incrementAndGet));
+
+ listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, healthCheck, webClient);
+
+ listSupplier.afterPropertiesSet();
+
+ Assertions.assertThat(instancesCanceled).hasValue(0);
+
+ listSupplier.destroy();
+ Awaitility.await()
+ .pollDelay(Duration.ofMillis(100)).atMost(VERIFY_TIMEOUT).untilAsserted(
+ () -> Assertions.assertThat(instancesCanceled).hasValue(1));
+ }
+
@Configuration(proxyBeanMethods = false)
@EnableAutoConfiguration
@RestController