Browse Source

Fixes issues in HealthCheckServiceInstanceListSupplier (#685)

- WebClient response leaks
- potential Scheduled task leak
- rework polling to plain reactor operators
- remove non atomic operations on ServiceInstance lists

related to gh-629
fixes issues from gh-683
pull/705/head
robotmrv 5 years ago committed by GitHub
parent
commit
6db6a81d82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      spring-cloud-loadbalancer/pom.xml
  2. 119
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java
  3. 404
      spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java

5
spring-cloud-loadbalancer/pom.xml

@ -88,5 +88,10 @@ @@ -88,5 +88,10 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

119
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java

@ -16,18 +16,19 @@ @@ -16,18 +16,19 @@
package org.springframework.cloud.loadbalancer.core;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerProperties;
import org.springframework.http.HttpStatus;
@ -40,10 +41,11 @@ import org.springframework.web.util.UriComponentsBuilder; @@ -40,10 +41,11 @@ import org.springframework.web.util.UriComponentsBuilder;
* {@link WebClient} to ping the <code>health</code> endpoint of the instances.
*
* @author Olga Maciaszek-Sharma
* @author Roman Matiushchenko
* @since 2.2.0
*/
public class HealthCheckServiceInstanceListSupplier
implements ServiceInstanceListSupplier {
implements ServiceInstanceListSupplier, InitializingBean, DisposableBean {
private static final Log LOG = LogFactory
.getLog(HealthCheckServiceInstanceListSupplier.class);
@ -56,50 +58,72 @@ public class HealthCheckServiceInstanceListSupplier @@ -56,50 +58,72 @@ public class HealthCheckServiceInstanceListSupplier
private final String defaultHealthCheckPath;
private List<ServiceInstance> instances = Collections
.synchronizedList(new ArrayList<>());
private final Flux<List<ServiceInstance>> aliveInstancesReplay;
private List<ServiceInstance> healthyInstances = Collections
.synchronizedList(new ArrayList<>());
private Disposable healthCheckDisposable;
public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) {
this.delegate = delegate;
this.healthCheck = healthCheck;
defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default",
this.defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default",
"/actuator/health");
this.webClient = webClient;
initInstances();
this.aliveInstancesReplay = Flux.defer(delegate)
.delaySubscription(Duration.ofMillis(this.healthCheck.getInitialDelay()))
.switchMap(serviceInstances -> healthCheckFlux(serviceInstances)
.map(alive -> Collections.unmodifiableList(new ArrayList<>(alive)))
)
.replay(1)
.refCount(1);
}
private void initInstances() {
delegate.get().subscribe(delegateInstances -> {
instances.clear();
instances.addAll(delegateInstances);
});
Flux<List<ServiceInstance>> healthCheckFlux = healthCheckFlux();
healthCheckFlux.subscribe(verifiedInstances -> {
healthyInstances.clear();
healthyInstances.addAll(verifiedInstances);
});
@Override
public void afterPropertiesSet() {
Disposable healthCheckDisposable = this.healthCheckDisposable;
if (healthCheckDisposable != null) {
healthCheckDisposable.dispose();
}
this.healthCheckDisposable = aliveInstancesReplay.subscribe();
}
protected Flux<List<ServiceInstance>> healthCheckFlux() {
return Flux.create(emitter -> Schedulers
.newSingle("Health Check Verifier: " + getServiceId(), true)
.schedulePeriodically(() -> {
List<ServiceInstance> verifiedInstances = new ArrayList<>();
Flux.fromIterable(instances).filterWhen(this::isAlive)
.subscribe(serviceInstance -> {
verifiedInstances.add(serviceInstance);
emitter.next(verifiedInstances);
});
}, healthCheck.getInitialDelay(), healthCheck.getInterval().toMillis(),
TimeUnit.MILLISECONDS),
FluxSink.OverflowStrategy.LATEST);
protected Flux<List<ServiceInstance>> healthCheckFlux(List<ServiceInstance> instances) {
return Flux.defer(() -> {
List<Mono<ServiceInstance>> checks = new ArrayList<>(instances.size());
for (ServiceInstance instance : instances) {
Mono<ServiceInstance> alive = isAlive(instance).onErrorResume(error -> {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"Exception occurred during health check of the instance for service %s: %s",
instance.getServiceId(), instance.getUri()), error);
}
return Mono.empty();
})
.timeout(this.healthCheck.getInterval(), Mono.defer(() -> {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"The instance for service %s: %s did not respond for %s during health check",
instance.getServiceId(), instance.getUri(),
this.healthCheck.getInterval()));
}
return Mono.empty();
}))
.handle((isHealthy, sink) -> {
if (isHealthy) {
sink.next(instance);
}
});
checks.add(alive);
}
List<ServiceInstance> result = new ArrayList<>();
return Flux.merge(checks).map(alive -> {
result.add(alive);
return result;
})
.defaultIfEmpty(result);
})
.repeatWhen(restart -> restart.delayElements(this.healthCheck.getInterval()));
}
@Override
@ -109,16 +133,7 @@ public class HealthCheckServiceInstanceListSupplier @@ -109,16 +133,7 @@ public class HealthCheckServiceInstanceListSupplier
@Override
public Flux<List<ServiceInstance>> get() {
if (!healthyInstances.isEmpty()) {
return Flux.defer(() -> Flux.fromIterable(healthyInstances).collectList());
}
// If there are no healthy instances, it might be better to still retry on all of
// them
if (LOG.isWarnEnabled()) {
LOG.warn(
"No verified healthy instances were found, returning all listed instances.");
}
return Flux.defer(() -> Flux.fromIterable(instances).collectList());
return aliveInstancesReplay;
}
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
@ -130,7 +145,17 @@ public class HealthCheckServiceInstanceListSupplier @@ -130,7 +145,17 @@ public class HealthCheckServiceInstanceListSupplier
.uri(UriComponentsBuilder.fromUri(serviceInstance.getUri())
.path(healthCheckPath).build().toUri())
.exchange()
.map(clientResponse -> HttpStatus.OK.equals(clientResponse.statusCode()));
.flatMap(clientResponse -> clientResponse.releaseBody()
.thenReturn(HttpStatus.OK.value() == clientResponse.rawStatusCode())
);
}
@Override
public void destroy() {
Disposable healthCheckDisposable = this.healthCheckDisposable;
if (healthCheckDisposable != null) {
healthCheckDisposable.dispose();
}
}
}

404
spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java

@ -16,8 +16,21 @@ @@ -16,8 +16,21 @@
package org.springframework.cloud.loadbalancer.core;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@ -39,6 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat; @@ -39,6 +52,7 @@ import static org.assertj.core.api.Assertions.assertThat;
* Tests for {@link HealthCheckServiceInstanceListSupplier}.
*
* @author Olga Maciaszek-Sharma
* @author Roman Matiushchenko
*/
@ExtendWith(SpringExtension.class)
@SpringBootTest(
@ -46,18 +60,37 @@ import static org.assertj.core.api.Assertions.assertThat; @@ -46,18 +60,37 @@ import static org.assertj.core.api.Assertions.assertThat;
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class HealthCheckServiceInstanceListSupplierTests {
private static final String SERVICE_ID = "ignored-service";
private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(5);
@LocalServerPort
private int port;
private final WebClient webClient = WebClient.create();
private LoadBalancerProperties.HealthCheck healthCheck = new LoadBalancerProperties.HealthCheck();
private LoadBalancerProperties.HealthCheck healthCheck;
private HealthCheckServiceInstanceListSupplier listSupplier;
@BeforeEach
void setUp() {
healthCheck = new LoadBalancerProperties.HealthCheck();
}
@AfterEach
void tearDown() throws Exception {
if (listSupplier != null) {
listSupplier.destroy();
listSupplier = null;
}
}
@SuppressWarnings("ConstantConditions")
@Test
void shouldCheckInstanceWithProvidedHealthCheckPath() {
healthCheck.getPath().put("ignored-service", "/health");
HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier(
listSupplier = new HealthCheckServiceInstanceListSupplier(
ServiceInstanceListSupplier.FixedServiceInstanceListSupplier
.with(new MockEnvironment()).build(),
healthCheck, webClient);
@ -72,7 +105,7 @@ class HealthCheckServiceInstanceListSupplierTests { @@ -72,7 +105,7 @@ class HealthCheckServiceInstanceListSupplierTests {
@SuppressWarnings("ConstantConditions")
@Test
void shouldCheckInstanceWithDefaultHealthCheckPath() {
HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier(
listSupplier = new HealthCheckServiceInstanceListSupplier(
ServiceInstanceListSupplier.FixedServiceInstanceListSupplier
.with(new MockEnvironment()).build(),
healthCheck, webClient);
@ -88,7 +121,7 @@ class HealthCheckServiceInstanceListSupplierTests { @@ -88,7 +121,7 @@ class HealthCheckServiceInstanceListSupplierTests {
@Test
void shouldReturnFalseIfEndpointNotFound() {
healthCheck.getPath().put("ignored-service", "/test");
HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier(
listSupplier = new HealthCheckServiceInstanceListSupplier(
ServiceInstanceListSupplier.FixedServiceInstanceListSupplier
.with(new MockEnvironment()).build(),
healthCheck, webClient);
@ -100,6 +133,369 @@ class HealthCheckServiceInstanceListSupplierTests { @@ -100,6 +133,369 @@ class HealthCheckServiceInstanceListSupplierTests {
assertThat(alive).isFalse();
}
@Test
void shouldReturnOnlyAliveService() {
healthCheck.setInitialDelay(1000);
ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
"127.0.0.1", port, false);
ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID,
"127.0.0.2", port, false);
StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2)));
HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(si1);
Mockito.doReturn(Mono.just(false)).when(mock).isAlive(si2);
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
healthCheck, webClient) {
@Override
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
return mock.isAlive(serviceInstance);
}
};
return listSupplier.get();
})
.expectSubscription()
.expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
.expectNext(Lists.list(si1))
.expectNoEvent(healthCheck.getInterval())
.thenCancel()
.verify(VERIFY_TIMEOUT);
}
@Test
void shouldEmitOnEachAliveServiceInBatch() {
healthCheck.setInitialDelay(1000);
ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
"127.0.0.1", port, false);
ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID,
"127.0.0.2", port, false);
StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2)));
HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(si1);
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(si2);
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
healthCheck, webClient) {
@Override
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
return mock.isAlive(serviceInstance);
}
};
return listSupplier.get();
})
.expectSubscription()
.expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
.expectNext(Lists.list(si1))
.expectNext(Lists.list(si1, si2))
.expectNoEvent(healthCheck.getInterval())
.thenCancel()
.verify(VERIFY_TIMEOUT);
}
@Test
void shouldNotFailIfIsAliveReturnsError() {
healthCheck.setInitialDelay(1000);
ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
"127.0.0.1", port, false);
ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID,
"127.0.0.2", port, false);
StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2)));
HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(si1);
Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(mock)
.isAlive(si2);
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
healthCheck, webClient) {
@Override
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
return mock.isAlive(serviceInstance);
}
};
return listSupplier.get();
})
.expectSubscription()
.expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
.expectNext(Lists.list(si1))
.expectNoEvent(healthCheck.getInterval())
.thenCancel()
.verify(VERIFY_TIMEOUT);
}
@Test
void shouldEmitAllInstancesIfAllIsAliveChecksFailed() {
healthCheck.setInitialDelay(1000);
ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
"127.0.0.1", port, false);
ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID,
"127.0.0.2", port, false);
StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2)));
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
healthCheck, webClient) {
@Override
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
if (serviceInstance == si1) {
return Mono.just(false);
}
else {
return Mono.error(new RuntimeException("boom"));
}
}
};
return listSupplier.get();
})
.expectSubscription()
.expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
.expectNext(Lists.list())
.expectNoEvent(healthCheck.getInterval())
.thenCancel()
.verify(VERIFY_TIMEOUT);
}
@Test
void shouldMakeInitialDaleyAfterPropertiesSet() {
healthCheck.setInitialDelay(1000);
ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
"127.0.0.1", port, false);
StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1)));
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
healthCheck, webClient) {
@Override
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
return Mono.just(true);
}
};
listSupplier.afterPropertiesSet();
return listSupplier.get();
})
.expectSubscription()
.expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
.expectNext(Lists.list(si1))
.expectNoEvent(healthCheck.getInterval())
.thenCancel()
.verify(VERIFY_TIMEOUT);
}
@Test
void shouldRepeatIsAliveChecksIndefinitely() {
healthCheck.setInitialDelay(1000);
ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
"127.0.0.1", port, false);
ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID,
"127.0.0.2", port, false);
StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2)));
HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
Mockito.doReturn(Mono.just(false), Mono.just(true)).when(mock).isAlive(si1);
Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(mock)
.isAlive(si2);
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
healthCheck, webClient) {
@Override
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
return mock.isAlive(serviceInstance);
}
};
return listSupplier.get();
})
.expectSubscription()
.expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
.expectNext(Lists.list())
.expectNoEvent(healthCheck.getInterval())
.expectNext(Lists.list(si1))
.expectNoEvent(healthCheck.getInterval())
.expectNext(Lists.list(si1))
.thenCancel().verify(VERIFY_TIMEOUT);
}
@Test
void shouldTimeoutIsAliveCheck() {
healthCheck.setInitialDelay(1000);
ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
"127.0.0.1", port, false);
StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1)));
HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
Mockito.when(mock.isAlive(si1)).thenReturn(Mono.never(), Mono.just(true));
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
healthCheck, webClient) {
@Override
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
return mock.isAlive(serviceInstance);
}
};
return listSupplier.get();
})
.expectSubscription()
.expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
.expectNoEvent(healthCheck.getInterval())
.expectNext(Lists.list())
.expectNoEvent(healthCheck.getInterval())
.expectNext(Lists.list(si1))
.expectNoEvent(healthCheck.getInterval())
.expectNext(Lists.list(si1))
.thenCancel().verify(VERIFY_TIMEOUT);
}
@Test
void shouldUpdateInstances() {
healthCheck.setInitialDelay(1000);
ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
"127.0.0.1", port, false);
ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID,
"127.0.0.2", port, false);
StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Flux<List<ServiceInstance>> instances = Flux.just(Lists.list(si1))
.concatWith(Flux.just(Lists.list(si1, si2))
.delayElements(healthCheck.getInterval().dividedBy(2)));
Mockito.when(delegate.get()).thenReturn(instances);
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
healthCheck, webClient) {
@Override
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
return Mono.just(true);
}
};
return listSupplier.get();
})
.expectSubscription()
.expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
.expectNext(Lists.list(si1))
.thenAwait(healthCheck.getInterval().dividedBy(2))
.expectNext(Lists.list(si1))
.expectNext(Lists.list(si1, si2))
.expectNoEvent(healthCheck.getInterval())
.expectNext(Lists.list(si1))
.expectNext(Lists.list(si1, si2))
.thenCancel()
.verify(VERIFY_TIMEOUT);
}
@Test
void shouldCacheResultIfAfterPropertiesSetInvoked() {
healthCheck.setInitialDelay(1000);
ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID,
"127.0.0.1", port, false);
AtomicInteger emitCounter = new AtomicInteger();
StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1)));
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
healthCheck, webClient) {
@Override
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
return Mono.just(true);
}
@Override
protected Flux<List<ServiceInstance>> healthCheckFlux(List<ServiceInstance> instances) {
return super.healthCheckFlux(instances).doOnNext(it -> emitCounter.incrementAndGet());
}
};
listSupplier.afterPropertiesSet();
return listSupplier.get().take(1).concatWith(listSupplier.get().take(1));
})
.expectSubscription()
.expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
.expectNext(Lists.list(si1))
.expectNext(Lists.list(si1))
.thenCancel()
.verify(VERIFY_TIMEOUT);
Assertions.assertThat(emitCounter).hasValue(1);
}
@Test
void shouldCancelSubscription() {
final AtomicInteger instancesCanceled = new AtomicInteger();
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
Mockito.when(delegate.get())
.thenReturn(Flux.<List<ServiceInstance>>never()
.log("test")
.doOnCancel(instancesCanceled::incrementAndGet));
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, healthCheck, webClient);
listSupplier.afterPropertiesSet();
Assertions.assertThat(instancesCanceled).hasValue(0);
listSupplier.destroy();
Awaitility.await()
.pollDelay(Duration.ofMillis(100)).atMost(VERIFY_TIMEOUT).untilAsserted(
() -> Assertions.assertThat(instancesCanceled).hasValue(1));
}
@Configuration(proxyBeanMethods = false)
@EnableAutoConfiguration
@RestController

Loading…
Cancel
Save