From bab7aba004fd3273a899df7bbf29365de9e27015 Mon Sep 17 00:00:00 2001 From: Olga Maciaszek-Sharma Date: Mon, 20 Feb 2023 17:54:11 +0100 Subject: [PATCH] Allow batching the emitted alive instances flux. --- docs/src/main/asciidoc/_configprops.adoc | 1 + .../main/asciidoc/spring-cloud-commons.adoc | 2 + .../loadbalancer/LoadBalancerProperties.java | 17 +++++ ...ealthCheckServiceInstanceListSupplier.java | 13 ++-- ...CheckServiceInstanceListSupplierTests.java | 68 +++++++++++++++++++ 5 files changed, 96 insertions(+), 5 deletions(-) diff --git a/docs/src/main/asciidoc/_configprops.adoc b/docs/src/main/asciidoc/_configprops.adoc index 03e9aaa3..40ec431d 100644 --- a/docs/src/main/asciidoc/_configprops.adoc +++ b/docs/src/main/asciidoc/_configprops.adoc @@ -48,6 +48,7 @@ |spring.cloud.loadbalancer.health-check.refetch-instances | `+++false+++` | Indicates whether the instances should be refetched by the `HealthCheckServiceInstanceListSupplier`. This can be used if the instances can be updated and the underlying delegate does not provide an ongoing flux. |spring.cloud.loadbalancer.health-check.refetch-instances-interval | `+++25s+++` | Interval for refetching available service instances. |spring.cloud.loadbalancer.health-check.repeat-health-check | `+++true+++` | Indicates whether health checks should keep repeating. It might be useful to set it to `false` if periodically refetching the instances, as every refetch will also trigger a healthcheck. +|spring.cloud.loadbalancer.health-check.update-results-list | `+++true+++` | Indicates whether the {@code healthCheckFlux} should emit on each alive {@link ServiceInstance} that has been retrieved. If set to {@code false}, the entire alive instances sequence is first collected into a list and only then emitted. |spring.cloud.loadbalancer.hint | | Allows setting the value of hint that is passed on to the LoadBalancer request and can subsequently be used in {@link ReactiveLoadBalancer} implementations. |spring.cloud.loadbalancer.hint-header-name | `+++X-SC-LB-Hint+++` | Allows setting the name of the header used for passing the hint for hint-based service instance filtering. |spring.cloud.loadbalancer.retry.avoid-previous-instance | `+++true+++` | Enables wrapping ServiceInstanceListSupplier beans with `RetryAwareServiceInstanceListSupplier` if Spring-Retry is in the classpath. diff --git a/docs/src/main/asciidoc/spring-cloud-commons.adoc b/docs/src/main/asciidoc/spring-cloud-commons.adoc index ffcdea4d..ecb3daf2 100644 --- a/docs/src/main/asciidoc/spring-cloud-commons.adoc +++ b/docs/src/main/asciidoc/spring-cloud-commons.adoc @@ -1045,6 +1045,8 @@ the value of the `spring.cloud.loadbalancer.health-check.path.default` property. TIP: If you rely on the default path (`/actuator/health`), make sure you add `spring-boot-starter-actuator` to your collaborator's dependencies, unless you are planning to add such an endpoint on your own. +TIP: By default, the `healthCheckFlux` will emit on each alive `ServiceInstance` that has been retrieved. You can modify this behaviour by setting the value of `spring.cloud.loadbalancer.health-check.update-results-list` to `false`. If this property is set to `false`, the entire alive instances sequence is first collected into a list and only then emitted, which ensures the flux does not emit values in between the health-check intervals set in properties. + In order to use the health-check scheduler approach, you will have to instantiate a `HealthCheckServiceInstanceListSupplier` bean in a <>. We use delegates to work with `ServiceInstanceListSupplier` beans. diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/LoadBalancerProperties.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/LoadBalancerProperties.java index f0f01bff..c7ab68fd 100644 --- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/LoadBalancerProperties.java +++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/LoadBalancerProperties.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException; import reactor.util.retry.RetryBackoffSpec; import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancer; import org.springframework.http.HttpMethod; import org.springframework.util.LinkedCaseInsensitiveMap; @@ -217,6 +218,14 @@ public class LoadBalancerProperties { */ private boolean repeatHealthCheck = true; + /** + * Indicates whether the {@code healthCheckFlux} should emit on each alive + * {@link ServiceInstance} that has been retrieved. If set to {@code false}, the + * entire alive instances sequence is first collected into a list and only then + * emitted. + */ + private boolean updateResultsList = true; + public boolean getRefetchInstances() { return refetchInstances; } @@ -273,6 +282,14 @@ public class LoadBalancerProperties { this.port = port; } + public boolean isUpdateResultsList() { + return updateResultsList; + } + + public void setUpdateResultsList(boolean updateResultsList) { + this.updateResultsList = updateResultsList; + } + } public static class Retry { diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java index d2086259..a3e57c00 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2021 the original author or authors. + * Copyright 2012-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -114,10 +114,13 @@ public class HealthCheckServiceInstanceListSupplier extends DelegatingServiceIns checks.add(alive); } List result = new ArrayList<>(); - return Flux.merge(checks).map(alive -> { - result.add(alive); - return result; - }).defaultIfEmpty(result); + if (healthCheck.isUpdateResultsList()) { + return Flux.merge(checks).map(alive -> { + result.add(alive); + return result; + }).defaultIfEmpty(result); + } + return Flux.merge(checks).collectList(); }).repeatWhen(healthCheckFluxRepeat); } diff --git a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java index dc0d4235..d2bb0eea 100644 --- a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java +++ b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java @@ -276,6 +276,39 @@ class HealthCheckServiceInstanceListSupplierTests { .expectNoEvent(properties.getHealthCheck().getInterval()).thenCancel().verify(VERIFY_TIMEOUT); } + @Test + void shouldEmitOnEntireBatchOfInstancesWhenUpdateDisabled() { + LoadBalancerProperties.HealthCheck healthCheck = properties.getHealthCheck(); + healthCheck.setInitialDelay(Duration.ofSeconds(1)); + healthCheck.setUpdateResultsList(false); + ServiceInstance serviceInstance1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, "127.0.0.1", + port, false); + ServiceInstance serviceInstance2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID, "127.0.0.2", + port, false); + + StepVerifier.withVirtualTime(() -> { + ServiceInstanceListSupplier delegate = mock(ServiceInstanceListSupplier.class); + Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); + Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(serviceInstance1, serviceInstance2))); + + HealthCheckServiceInstanceListSupplier mock = mock(HealthCheckServiceInstanceListSupplier.class); + Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance1); + Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance2); + + listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, + buildLoadBalancerClientFactory(SERVICE_ID, properties), healthCheckFunction(webClient)) { + @Override + protected Mono isAlive(ServiceInstance serviceInstance) { + return mock.isAlive(serviceInstance); + } + }; + + return listSupplier.get(); + }).expectSubscription().expectNoEvent(properties.getHealthCheck().getInitialDelay()) + .expectNext(Lists.list(serviceInstance1, serviceInstance2)) + .expectNoEvent(properties.getHealthCheck().getInterval()).thenCancel().verify(VERIFY_TIMEOUT); + } + @Test void shouldNotFailIfIsAliveReturnsError() { properties.getHealthCheck().setInitialDelay(Duration.ofSeconds(1)); @@ -458,6 +491,41 @@ class HealthCheckServiceInstanceListSupplierTests { .expectNext(Lists.list(serviceInstance1, serviceInstance2)).thenCancel().verify(VERIFY_TIMEOUT); } + @Test + void shouldReturnAllInstancesWhenUpdateDisabled() { + LoadBalancerProperties.HealthCheck healthCheck = properties.getHealthCheck(); + healthCheck.setInitialDelay(Duration.ofSeconds(1)); + healthCheck.setUpdateResultsList(false); + ServiceInstance serviceInstance1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, "127.0.0.1", + port, false); + ServiceInstance serviceInstance2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID, "127.0.0.2", + port, false); + + StepVerifier.withVirtualTime(() -> { + ServiceInstanceListSupplier delegate = mock(ServiceInstanceListSupplier.class); + Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); + Flux> instances = Flux.just(Lists.list(serviceInstance1)) + .concatWith(Flux.just(Lists.list(serviceInstance1, serviceInstance2)) + .delayElements(properties.getHealthCheck().getInterval().dividedBy(2))); + Mockito.when(delegate.get()).thenReturn(instances); + + listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, + buildLoadBalancerClientFactory(SERVICE_ID, properties), healthCheckFunction(webClient)) { + @Override + protected Mono isAlive(ServiceInstance serviceInstance) { + return Mono.just(true); + } + }; + + return listSupplier.get(); + }).expectSubscription().expectNoEvent(properties.getHealthCheck().getInitialDelay()) + .expectNext(Lists.list(serviceInstance1)) + .thenAwait(properties.getHealthCheck().getInterval().dividedBy(2)) + .expectNext(Lists.list(serviceInstance1, serviceInstance2)) + .expectNoEvent(properties.getHealthCheck().getInterval()) + .expectNext(Lists.list(serviceInstance1, serviceInstance2)).thenCancel().verify(VERIFY_TIMEOUT); + } + @Test void shouldRefetchInstances() { properties.getHealthCheck().setInitialDelay(Duration.ofSeconds(1));