Compare commits

...

1 Commits

Author SHA1 Message Date
Olga Maciaszek-Sharma bab7aba004 Allow batching the emitted alive instances flux. 2 years ago
  1. 1
      docs/src/main/asciidoc/_configprops.adoc
  2. 2
      docs/src/main/asciidoc/spring-cloud-commons.adoc
  3. 17
      spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/LoadBalancerProperties.java
  4. 13
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java
  5. 68
      spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java

1
docs/src/main/asciidoc/_configprops.adoc

@ -48,6 +48,7 @@ @@ -48,6 +48,7 @@
|spring.cloud.loadbalancer.health-check.refetch-instances | `+++false+++` | Indicates whether the instances should be refetched by the `HealthCheckServiceInstanceListSupplier`. This can be used if the instances can be updated and the underlying delegate does not provide an ongoing flux.
|spring.cloud.loadbalancer.health-check.refetch-instances-interval | `+++25s+++` | Interval for refetching available service instances.
|spring.cloud.loadbalancer.health-check.repeat-health-check | `+++true+++` | Indicates whether health checks should keep repeating. It might be useful to set it to `false` if periodically refetching the instances, as every refetch will also trigger a healthcheck.
|spring.cloud.loadbalancer.health-check.update-results-list | `+++true+++` | Indicates whether the {@code healthCheckFlux} should emit on each alive {@link ServiceInstance} that has been retrieved. If set to {@code false}, the entire alive instances sequence is first collected into a list and only then emitted.
|spring.cloud.loadbalancer.hint | | Allows setting the value of <code>hint</code> that is passed on to the LoadBalancer request and can subsequently be used in {@link ReactiveLoadBalancer} implementations.
|spring.cloud.loadbalancer.hint-header-name | `+++X-SC-LB-Hint+++` | Allows setting the name of the header used for passing the hint for hint-based service instance filtering.
|spring.cloud.loadbalancer.retry.avoid-previous-instance | `+++true+++` | Enables wrapping ServiceInstanceListSupplier beans with `RetryAwareServiceInstanceListSupplier` if Spring-Retry is in the classpath.

2
docs/src/main/asciidoc/spring-cloud-commons.adoc

@ -1045,6 +1045,8 @@ the value of the `spring.cloud.loadbalancer.health-check.path.default` property. @@ -1045,6 +1045,8 @@ the value of the `spring.cloud.loadbalancer.health-check.path.default` property.
TIP: If you rely on the default path (`/actuator/health`), make sure you add `spring-boot-starter-actuator` to your collaborator's dependencies, unless you are planning to add such an endpoint on your own.
TIP: By default, the `healthCheckFlux` will emit on each alive `ServiceInstance` that has been retrieved. You can modify this behaviour by setting the value of `spring.cloud.loadbalancer.health-check.update-results-list` to `false`. If this property is set to `false`, the entire alive instances sequence is first collected into a list and only then emitted, which ensures the flux does not emit values in between the health-check intervals set in properties.
In order to use the health-check scheduler approach, you will have to instantiate a `HealthCheckServiceInstanceListSupplier` bean in a <<custom-loadbalancer-configuration,custom configuration>>.
We use delegates to work with `ServiceInstanceListSupplier` beans.

17
spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/LoadBalancerProperties.java

@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException; @@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException;
import reactor.util.retry.RetryBackoffSpec;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancer;
import org.springframework.http.HttpMethod;
import org.springframework.util.LinkedCaseInsensitiveMap;
@ -217,6 +218,14 @@ public class LoadBalancerProperties { @@ -217,6 +218,14 @@ public class LoadBalancerProperties {
*/
private boolean repeatHealthCheck = true;
/**
* Indicates whether the {@code healthCheckFlux} should emit on each alive
* {@link ServiceInstance} that has been retrieved. If set to {@code false}, the
* entire alive instances sequence is first collected into a list and only then
* emitted.
*/
private boolean updateResultsList = true;
public boolean getRefetchInstances() {
return refetchInstances;
}
@ -273,6 +282,14 @@ public class LoadBalancerProperties { @@ -273,6 +282,14 @@ public class LoadBalancerProperties {
this.port = port;
}
public boolean isUpdateResultsList() {
return updateResultsList;
}
public void setUpdateResultsList(boolean updateResultsList) {
this.updateResultsList = updateResultsList;
}
}
public static class Retry {

13
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2012-2021 the original author or authors.
* Copyright 2012-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -114,10 +114,13 @@ public class HealthCheckServiceInstanceListSupplier extends DelegatingServiceIns @@ -114,10 +114,13 @@ public class HealthCheckServiceInstanceListSupplier extends DelegatingServiceIns
checks.add(alive);
}
List<ServiceInstance> result = new ArrayList<>();
return Flux.merge(checks).map(alive -> {
result.add(alive);
return result;
}).defaultIfEmpty(result);
if (healthCheck.isUpdateResultsList()) {
return Flux.merge(checks).map(alive -> {
result.add(alive);
return result;
}).defaultIfEmpty(result);
}
return Flux.merge(checks).collectList();
}).repeatWhen(healthCheckFluxRepeat);
}

68
spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java

@ -276,6 +276,39 @@ class HealthCheckServiceInstanceListSupplierTests { @@ -276,6 +276,39 @@ class HealthCheckServiceInstanceListSupplierTests {
.expectNoEvent(properties.getHealthCheck().getInterval()).thenCancel().verify(VERIFY_TIMEOUT);
}
@Test
void shouldEmitOnEntireBatchOfInstancesWhenUpdateDisabled() {
LoadBalancerProperties.HealthCheck healthCheck = properties.getHealthCheck();
healthCheck.setInitialDelay(Duration.ofSeconds(1));
healthCheck.setUpdateResultsList(false);
ServiceInstance serviceInstance1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, "127.0.0.1",
port, false);
ServiceInstance serviceInstance2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID, "127.0.0.2",
port, false);
StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = mock(ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(serviceInstance1, serviceInstance2)));
HealthCheckServiceInstanceListSupplier mock = mock(HealthCheckServiceInstanceListSupplier.class);
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance1);
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance2);
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
buildLoadBalancerClientFactory(SERVICE_ID, properties), healthCheckFunction(webClient)) {
@Override
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
return mock.isAlive(serviceInstance);
}
};
return listSupplier.get();
}).expectSubscription().expectNoEvent(properties.getHealthCheck().getInitialDelay())
.expectNext(Lists.list(serviceInstance1, serviceInstance2))
.expectNoEvent(properties.getHealthCheck().getInterval()).thenCancel().verify(VERIFY_TIMEOUT);
}
@Test
void shouldNotFailIfIsAliveReturnsError() {
properties.getHealthCheck().setInitialDelay(Duration.ofSeconds(1));
@ -458,6 +491,41 @@ class HealthCheckServiceInstanceListSupplierTests { @@ -458,6 +491,41 @@ class HealthCheckServiceInstanceListSupplierTests {
.expectNext(Lists.list(serviceInstance1, serviceInstance2)).thenCancel().verify(VERIFY_TIMEOUT);
}
@Test
void shouldReturnAllInstancesWhenUpdateDisabled() {
LoadBalancerProperties.HealthCheck healthCheck = properties.getHealthCheck();
healthCheck.setInitialDelay(Duration.ofSeconds(1));
healthCheck.setUpdateResultsList(false);
ServiceInstance serviceInstance1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, "127.0.0.1",
port, false);
ServiceInstance serviceInstance2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID, "127.0.0.2",
port, false);
StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = mock(ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Flux<List<ServiceInstance>> instances = Flux.just(Lists.list(serviceInstance1))
.concatWith(Flux.just(Lists.list(serviceInstance1, serviceInstance2))
.delayElements(properties.getHealthCheck().getInterval().dividedBy(2)));
Mockito.when(delegate.get()).thenReturn(instances);
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
buildLoadBalancerClientFactory(SERVICE_ID, properties), healthCheckFunction(webClient)) {
@Override
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
return Mono.just(true);
}
};
return listSupplier.get();
}).expectSubscription().expectNoEvent(properties.getHealthCheck().getInitialDelay())
.expectNext(Lists.list(serviceInstance1))
.thenAwait(properties.getHealthCheck().getInterval().dividedBy(2))
.expectNext(Lists.list(serviceInstance1, serviceInstance2))
.expectNoEvent(properties.getHealthCheck().getInterval())
.expectNext(Lists.list(serviceInstance1, serviceInstance2)).thenCancel().verify(VERIFY_TIMEOUT);
}
@Test
void shouldRefetchInstances() {
properties.getHealthCheck().setInitialDelay(Duration.ofSeconds(1));

Loading…
Cancel
Save