Browse Source

Gh 760 health check with cache new (#765)

* Cache first element of service instance list flux.

* Invoke destroy() and afterPropertiesSet() in non-bean ServiceInstanceListSupplier delegates.

* Fix return updated instances.

* Fix return updated instances.
pull/768/head
Olga Maciaszek-Sharma 4 years ago committed by GitHub
parent
commit
88b2f0e869
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/CachingServiceInstanceListSupplier.java
  2. 21
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DelegatingServiceInstanceListSupplier.java
  3. 11
      spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java
  4. 162
      spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/CachingServiceInstanceListSupplierTests.java
  5. 132
      spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplierTests.java
  6. 1
      src/checkstyle/checkstyle-suppressions.xml

2
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/CachingServiceInstanceListSupplier.java

@ -69,7 +69,7 @@ public class CachingServiceInstanceListSupplier @@ -69,7 +69,7 @@ public class CachingServiceInstanceListSupplier
return Mono.empty();
}
return Flux.just(list).materialize().collectList();
}, delegate.getServiceId()).onCacheMissResume(delegate)
}, delegate.getServiceId()).onCacheMissResume(delegate.get().take(1))
.andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize()
.doOnNext(instances -> {
Cache cache = cacheManager

21
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DelegatingServiceInstanceListSupplier.java

@ -16,6 +16,8 @@ @@ -16,6 +16,8 @@
package org.springframework.cloud.loadbalancer.core;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;
/**
@ -23,11 +25,12 @@ import org.springframework.util.Assert; @@ -23,11 +25,12 @@ import org.springframework.util.Assert;
* {@link ServiceInstanceListSupplier} instance underneath.
*
* @author Spencer Gibb
* @author Olga Maciaszek-Sharma
*/
public abstract class DelegatingServiceInstanceListSupplier
implements ServiceInstanceListSupplier {
implements ServiceInstanceListSupplier, InitializingBean, DisposableBean {
private final ServiceInstanceListSupplier delegate;
protected final ServiceInstanceListSupplier delegate;
public DelegatingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate) {
Assert.notNull(delegate, "delegate may not be null");
@ -43,4 +46,18 @@ public abstract class DelegatingServiceInstanceListSupplier @@ -43,4 +46,18 @@ public abstract class DelegatingServiceInstanceListSupplier
return this.delegate.getServiceId();
}
@Override
public void afterPropertiesSet() throws Exception {
if (delegate instanceof InitializingBean) {
((InitializingBean) delegate).afterPropertiesSet();
}
}
@Override
public void destroy() throws Exception {
if (delegate instanceof DisposableBean) {
((DisposableBean) delegate).destroy();
}
}
}

11
spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplier.java

@ -41,20 +41,21 @@ public class DiscoveryClientServiceInstanceListSupplier @@ -41,20 +41,21 @@ public class DiscoveryClientServiceInstanceListSupplier
private final String serviceId;
private final Flux<ServiceInstance> serviceInstances;
private final Flux<List<ServiceInstance>> serviceInstances;
public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate,
Environment environment) {
this.serviceId = environment.getProperty(PROPERTY_NAME);
this.serviceInstances = Flux
.defer(() -> Flux.fromIterable(delegate.getInstances(serviceId)))
.subscribeOn(Schedulers.boundedElastic());
.defer(() -> Flux.fromIterable(delegate.getInstances(serviceId))
.collectList().flux().subscribeOn(Schedulers.boundedElastic()));
}
public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate,
Environment environment) {
this.serviceId = environment.getProperty(PROPERTY_NAME);
this.serviceInstances = delegate.getInstances(serviceId);
this.serviceInstances = Flux
.defer(() -> delegate.getInstances(serviceId).collectList().flux());
}
@Override
@ -64,7 +65,7 @@ public class DiscoveryClientServiceInstanceListSupplier @@ -64,7 +65,7 @@ public class DiscoveryClientServiceInstanceListSupplier
@Override
public Flux<List<ServiceInstance>> get() {
return serviceInstances.collectList().flux();
return serviceInstances;
}
}

162
spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/CachingServiceInstanceListSupplierTests.java

@ -0,0 +1,162 @@ @@ -0,0 +1,162 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.loadbalancer.core;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerProperties;
import org.springframework.cloud.loadbalancer.blocking.client.BlockingLoadBalancerClient;
import org.springframework.cloud.loadbalancer.cache.LoadBalancerCacheManager;
import org.springframework.cloud.loadbalancer.config.LoadBalancerCacheAutoConfiguration;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.web.reactive.function.client.WebClient;
import static java.time.Duration.ofMillis;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
/**
* Tests for {@link CachingServiceInstanceListSupplier}.
*
* @author Olga Maciaszek-Sharma
*/
@SpringBootTest(classes = CachingServiceInstanceListSupplierTests.TestConfig.class)
@ExtendWith(SpringExtension.class)
class CachingServiceInstanceListSupplierTests {
public static final String SERVICE_ID = "test";
static {
System.setProperty("loadbalancer.client.name", SERVICE_ID);
}
@Autowired
BlockingLoadBalancerClient blockingLoadBalancerClient;
private static DefaultServiceInstance instance(String host, boolean secure) {
return new DefaultServiceInstance(SERVICE_ID, SERVICE_ID, host, 80, secure);
}
@Test
void shouldNotHangOnCachingWhenDelegateReturnsInfiniteStream() {
assertTimeoutPreemptively(ofMillis(500), () -> {
blockingLoadBalancerClient.choose(SERVICE_ID);
});
}
@Configuration(proxyBeanMethods = false)
@Import(LoadBalancerCacheAutoConfiguration.class)
protected static class TestConfig {
@Bean
public ReactiveDiscoveryClient reactiveDiscoveryClient() {
return new ReactiveDiscoveryClient() {
@Override
public String description() {
return SERVICE_ID;
}
@Override
public Flux<ServiceInstance> getInstances(String serviceId) {
return Flux.just(instance("1host", false),
instance("2host-secure", true));
}
@Override
public Flux<String> getServices() {
return Flux.just(SERVICE_ID);
}
};
}
@Bean
ReactorLoadBalancer<ServiceInstance> reactorLoadBalancer(
ObjectProvider<ServiceInstanceListSupplier> provider) {
return new RoundRobinLoadBalancer(provider, SERVICE_ID);
}
@Bean
LoadBalancerClientFactory loadBalancerClientFactory() {
return new LoadBalancerClientFactory();
}
@Bean
BlockingLoadBalancerClient blockingLoadBalancerClient(
LoadBalancerClientFactory loadBalancerClientFactory) {
return new BlockingLoadBalancerClient(loadBalancerClientFactory);
}
@Bean
public LoadBalancerProperties loadBalancerProperties() {
return new LoadBalancerProperties();
}
@Bean
public WebClient.Builder webClientBuilder() {
return WebClient.builder();
}
@Bean
ServiceInstanceListSupplier supplier(ConfigurableApplicationContext context,
ReactiveDiscoveryClient discoveryClient,
LoadBalancerProperties loadBalancerProperties,
WebClient.Builder webClientBuilder) {
DiscoveryClientServiceInstanceListSupplier firstDelegate = new DiscoveryClientServiceInstanceListSupplier(
discoveryClient, context.getEnvironment());
HealthCheckServiceInstanceListSupplier delegate = new TestHealthCheckServiceInstanceListSupplier(
firstDelegate, loadBalancerProperties.getHealthCheck(),
webClientBuilder.build());
delegate.afterPropertiesSet();
ObjectProvider<LoadBalancerCacheManager> cacheManagerProvider = context
.getBeanProvider(LoadBalancerCacheManager.class);
return new CachingServiceInstanceListSupplier(delegate,
cacheManagerProvider.getIfAvailable());
}
private static class TestHealthCheckServiceInstanceListSupplier
extends HealthCheckServiceInstanceListSupplier {
TestHealthCheckServiceInstanceListSupplier(
ServiceInstanceListSupplier delegate,
LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) {
super(delegate, healthCheck, webClient);
}
@Override
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
return Mono.just(true);
}
}
}
}

132
spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/DiscoveryClientServiceInstanceListSupplierTests.java

@ -0,0 +1,132 @@ @@ -0,0 +1,132 @@
/*
* Copyright 2012-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.loadbalancer.core;
import org.assertj.core.util.Lists;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.mock.env.MockEnvironment;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests for {@link DiscoveryClientServiceInstanceListSupplier}.
*
* @author Olga Maciaszek-Sharma
*/
class DiscoveryClientServiceInstanceListSupplierTests {
private static final String SERVICE_ID = "test";
private final MockEnvironment environment = new MockEnvironment();
private final ReactiveDiscoveryClient reactiveDiscoveryClient = mock(
ReactiveDiscoveryClient.class);
private final DiscoveryClient discoveryClient = mock(DiscoveryClient.class);
private DiscoveryClientServiceInstanceListSupplier supplier;
private static DefaultServiceInstance instance(String host, boolean secure) {
return new DefaultServiceInstance(SERVICE_ID, SERVICE_ID, host, 80, secure);
}
@BeforeEach
void setUp() {
environment.setProperty("loadbalancer.client.name", SERVICE_ID);
}
@Test
void shouldReturnRetrievedInstances() {
when(reactiveDiscoveryClient.getInstances(SERVICE_ID)).thenReturn(
Flux.just(instance("1host", false), instance("2host-secure", true)));
StepVerifier.withVirtualTime(() -> {
supplier = new DiscoveryClientServiceInstanceListSupplier(
reactiveDiscoveryClient, environment);
return supplier.get();
}).expectSubscription().expectNext(
Lists.list(instance("1host", false), instance("2host-secure", true)))
.thenCancel().verify();
}
@Test
void shouldUpdateReturnRetrievedInstances() {
when(reactiveDiscoveryClient.getInstances(SERVICE_ID)).thenReturn(
Flux.just(instance("1host", false), instance("2host-secure", true)));
supplier = new DiscoveryClientServiceInstanceListSupplier(reactiveDiscoveryClient,
environment);
StepVerifier.withVirtualTime(() -> supplier.get()).expectSubscription()
.expectNext(Lists.list(instance("1host", false),
instance("2host-secure", true)))
.thenCancel().verify();
when(reactiveDiscoveryClient.getInstances(SERVICE_ID))
.thenReturn(Flux.just(instance("1host", false),
instance("2host-secure", true), instance("3host", false)));
StepVerifier.withVirtualTime(() -> supplier.get()).expectSubscription()
.expectNext(Lists.list(instance("1host", false),
instance("2host-secure", true), instance("3host", false)))
.thenCancel().verify();
}
@Test
void shouldReturnRetrievedInstancesBlockingClient() {
StepVerifier.withVirtualTime(() -> {
when(discoveryClient.getInstances(SERVICE_ID)).thenReturn(
Lists.list(instance("1host", false), instance("2host-secure", true)));
supplier = new DiscoveryClientServiceInstanceListSupplier(discoveryClient,
environment);
return supplier.get();
}).expectSubscription().expectNext(
Lists.list(instance("1host", false), instance("2host-secure", true)))
.thenCancel().verify();
}
@Test
void shouldUpdateReturnRetrievedInstancesBlockingClient() {
when(discoveryClient.getInstances(SERVICE_ID)).thenReturn(
Lists.list(instance("1host", false), instance("2host-secure", true)));
supplier = new DiscoveryClientServiceInstanceListSupplier(discoveryClient,
environment);
StepVerifier.withVirtualTime(() -> supplier.get()).expectSubscription()
.expectNext(Lists.list(instance("1host", false),
instance("2host-secure", true)))
.thenCancel().verify();
when(discoveryClient.getInstances(SERVICE_ID))
.thenReturn(Lists.list(instance("1host", false),
instance("2host-secure", true), instance("3host", false)));
StepVerifier.withVirtualTime(() -> supplier.get()).expectSubscription()
.expectNext(Lists.list(instance("1host", false),
instance("2host-secure", true), instance("3host", false)))
.thenCancel().verify();
}
}

1
src/checkstyle/checkstyle-suppressions.xml

@ -14,4 +14,5 @@ @@ -14,4 +14,5 @@
<suppress files=".*RefreshAutoConfigurationTests.*" checks="JavadocVariable"/>
<suppress files=".*RefreshAutoConfigurationMoreClassPathTests.*" checks="JavadocVariable"/>
<suppress files=".*EnvironmentDecryptApplicationInitializerTests.*" checks="JavadocVariable"/>
<suppress files=".*CachingServiceInstanceListSupplierTests.*" checks="RegexpSinglelineJava"/>
</suppressions>

Loading…
Cancel
Save