Olga Maciaszek-Sharma
4 years ago
committed by
GitHub
14 changed files with 1086 additions and 28 deletions
@ -0,0 +1,79 @@
@@ -0,0 +1,79 @@
|
||||
/* |
||||
* Copyright 2012-2021 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.client.loadbalancer; |
||||
|
||||
import java.util.Objects; |
||||
|
||||
import org.springframework.cloud.client.ServiceInstance; |
||||
import org.springframework.core.style.ToStringCreator; |
||||
|
||||
/** |
||||
* A request context object that allows storing information on previously used service |
||||
* instances. |
||||
* |
||||
* @author Olga Maciaszek-Sharma |
||||
* @since 2.2.7 |
||||
*/ |
||||
public class RetryableRequestContext extends DefaultRequestContext { |
||||
|
||||
private ServiceInstance previousServiceInstance; |
||||
|
||||
public RetryableRequestContext(ServiceInstance previousServiceInstance) { |
||||
this.previousServiceInstance = previousServiceInstance; |
||||
} |
||||
|
||||
public RetryableRequestContext(ServiceInstance previousServiceInstance, String hint) { |
||||
super(hint); |
||||
this.previousServiceInstance = previousServiceInstance; |
||||
} |
||||
|
||||
public ServiceInstance getPreviousServiceInstance() { |
||||
return previousServiceInstance; |
||||
} |
||||
|
||||
public void setPreviousServiceInstance(ServiceInstance previousServiceInstance) { |
||||
this.previousServiceInstance = previousServiceInstance; |
||||
} |
||||
|
||||
@Override |
||||
public String toString() { |
||||
ToStringCreator to = new ToStringCreator(this); |
||||
to.append("previousServiceInstance", previousServiceInstance); |
||||
return to.toString(); |
||||
} |
||||
|
||||
@Override |
||||
public boolean equals(Object o) { |
||||
if (this == o) { |
||||
return true; |
||||
} |
||||
if (!(o instanceof RetryableRequestContext)) { |
||||
return false; |
||||
} |
||||
if (!super.equals(o)) { |
||||
return false; |
||||
} |
||||
RetryableRequestContext context = (RetryableRequestContext) o; |
||||
return Objects.equals(previousServiceInstance, context.previousServiceInstance); |
||||
} |
||||
|
||||
@Override |
||||
public int hashCode() { |
||||
return Objects.hash(super.hashCode(), previousServiceInstance); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,44 @@
@@ -0,0 +1,44 @@
|
||||
/* |
||||
* Copyright 2012-2021 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.client.loadbalancer.reactive; |
||||
|
||||
import java.net.URI; |
||||
|
||||
import org.springframework.web.reactive.function.client.ClientRequest; |
||||
import org.springframework.web.reactive.function.client.ExchangeFilterFunction; |
||||
|
||||
/** |
||||
* A utility class for load-balanced {@link ExchangeFilterFunction} instances. |
||||
* |
||||
* @author Olga Maciaszek-Sharma |
||||
* @since 2.2.7 |
||||
*/ |
||||
public final class ExchangeFilterFunctionUtils { |
||||
|
||||
private ExchangeFilterFunctionUtils() { |
||||
throw new IllegalStateException("Can't instantiate a utility class."); |
||||
} |
||||
|
||||
static ClientRequest buildClientRequest(ClientRequest request, URI uri) { |
||||
return ClientRequest.create(request.method(), uri) |
||||
.headers(headers -> headers.addAll(request.headers())) |
||||
.cookies(cookies -> cookies.addAll(request.cookies())) |
||||
.attributes(attributes -> attributes.putAll(request.attributes())) |
||||
.body(request.body()).build(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,83 @@
@@ -0,0 +1,83 @@
|
||||
/* |
||||
* Copyright 2012-2021 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.client.loadbalancer.reactive; |
||||
|
||||
import org.springframework.http.HttpMethod; |
||||
import org.springframework.web.reactive.function.client.ClientRequest; |
||||
import org.springframework.web.reactive.function.client.ClientResponse; |
||||
|
||||
/** |
||||
* Stores the data for a load-balanced call that is being retried. |
||||
* |
||||
* @author Olga Maciaszek-Sharma |
||||
* @since 2.2.7 |
||||
*/ |
||||
class LoadBalancerRetryContext { |
||||
|
||||
private final ClientRequest request; |
||||
|
||||
private ClientResponse clientResponse; |
||||
|
||||
private Integer retriesSameServiceInstance = 0; |
||||
|
||||
private Integer retriesNextServiceInstance = 0; |
||||
|
||||
LoadBalancerRetryContext(ClientRequest request) { |
||||
this.request = request; |
||||
} |
||||
|
||||
ClientRequest getRequest() { |
||||
return request; |
||||
} |
||||
|
||||
ClientResponse getClientResponse() { |
||||
return clientResponse; |
||||
} |
||||
|
||||
void setClientResponse(ClientResponse clientResponse) { |
||||
this.clientResponse = clientResponse; |
||||
} |
||||
|
||||
Integer getRetriesSameServiceInstance() { |
||||
return retriesSameServiceInstance; |
||||
} |
||||
|
||||
void incrementRetriesSameServiceInstance() { |
||||
retriesSameServiceInstance++; |
||||
} |
||||
|
||||
void resetRetriesSameServiceInstance() { |
||||
retriesSameServiceInstance = 0; |
||||
} |
||||
|
||||
Integer getRetriesNextServiceInstance() { |
||||
return retriesNextServiceInstance; |
||||
} |
||||
|
||||
void incrementRetriesNextServiceInstance() { |
||||
retriesNextServiceInstance++; |
||||
} |
||||
|
||||
Integer getResponseStatusCode() { |
||||
return clientResponse.statusCode().value(); |
||||
} |
||||
|
||||
HttpMethod getRequestMethod() { |
||||
return request.method(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,58 @@
@@ -0,0 +1,58 @@
|
||||
/* |
||||
* Copyright 2012-2021 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.client.loadbalancer.reactive; |
||||
|
||||
import org.springframework.http.HttpMethod; |
||||
|
||||
/** |
||||
* Pluggable policy used to establish whether a given load-balanced call should be |
||||
* retried. |
||||
* |
||||
* @author Olga Maciaszek-Sharma |
||||
* @since 2.2.7 |
||||
*/ |
||||
public interface LoadBalancerRetryPolicy { |
||||
|
||||
/** |
||||
* Return <code>true</code> to retry on the same service instance. |
||||
* @param context the context for the retry operation |
||||
* @return true to retry on the same service instance |
||||
*/ |
||||
boolean canRetrySameServiceInstance(LoadBalancerRetryContext context); |
||||
|
||||
/** |
||||
* Return <code>true</code> to retry on the next service instance. |
||||
* @param context the context for the retry operation |
||||
* @return true to retry on the same service instance |
||||
*/ |
||||
boolean canRetryNextServiceInstance(LoadBalancerRetryContext context); |
||||
|
||||
/** |
||||
* Return <code>true</code> to retry on the provided HTTP status code. |
||||
* @param statusCode the HTTP status code |
||||
* @return true to retry on the provided HTTP status code |
||||
*/ |
||||
boolean retryableStatusCode(int statusCode); |
||||
|
||||
/** |
||||
* Return <code>true</code> to retry on the provided HTTP method. |
||||
* @param method the HTTP request method |
||||
* @return true to retry on the provided HTTP method |
||||
*/ |
||||
boolean canRetryOnMethod(HttpMethod method); |
||||
|
||||
} |
@ -0,0 +1,60 @@
@@ -0,0 +1,60 @@
|
||||
/* |
||||
* Copyright 2012-2021 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.client.loadbalancer.reactive; |
||||
|
||||
import org.springframework.cloud.client.loadbalancer.LoadBalancerRetryProperties; |
||||
import org.springframework.http.HttpMethod; |
||||
|
||||
/** |
||||
* The default implementation of {@link LoadBalancerRetryPolicy}. |
||||
* |
||||
* @author Olga Maciaszek-Sharma |
||||
* @since 2.2.7 |
||||
*/ |
||||
public class RetryableExchangeFilterFunctionLoadBalancerRetryPolicy |
||||
implements LoadBalancerRetryPolicy { |
||||
|
||||
private final LoadBalancerRetryProperties properties; |
||||
|
||||
public RetryableExchangeFilterFunctionLoadBalancerRetryPolicy( |
||||
LoadBalancerRetryProperties properties) { |
||||
this.properties = properties; |
||||
} |
||||
|
||||
@Override |
||||
public boolean canRetrySameServiceInstance(LoadBalancerRetryContext context) { |
||||
return context.getRetriesSameServiceInstance() < properties |
||||
.getMaxRetriesOnSameServiceInstance(); |
||||
} |
||||
|
||||
@Override |
||||
public boolean canRetryNextServiceInstance(LoadBalancerRetryContext context) { |
||||
return context.getRetriesNextServiceInstance() < properties |
||||
.getMaxRetriesOnNextServiceInstance(); |
||||
} |
||||
|
||||
@Override |
||||
public boolean retryableStatusCode(int statusCode) { |
||||
return properties.getRetryableStatusCodes().contains(statusCode); |
||||
} |
||||
|
||||
@Override |
||||
public boolean canRetryOnMethod(HttpMethod method) { |
||||
return HttpMethod.GET.equals(method) || properties.isRetryOnAllOperations(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,218 @@
@@ -0,0 +1,218 @@
|
||||
/* |
||||
* Copyright 2012-2021 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.client.loadbalancer.reactive; |
||||
|
||||
import java.io.IOException; |
||||
import java.net.URI; |
||||
import java.util.Arrays; |
||||
import java.util.List; |
||||
import java.util.concurrent.TimeoutException; |
||||
|
||||
import org.apache.commons.logging.Log; |
||||
import org.apache.commons.logging.LogFactory; |
||||
import reactor.core.Exceptions; |
||||
import reactor.core.publisher.Mono; |
||||
import reactor.util.retry.Retry; |
||||
import reactor.util.retry.RetrySpec; |
||||
|
||||
import org.springframework.cloud.client.ServiceInstance; |
||||
import org.springframework.cloud.client.loadbalancer.LoadBalancerRetryProperties; |
||||
import org.springframework.cloud.client.loadbalancer.RetryableRequestContext; |
||||
import org.springframework.http.HttpStatus; |
||||
import org.springframework.web.reactive.function.client.ClientRequest; |
||||
import org.springframework.web.reactive.function.client.ClientResponse; |
||||
import org.springframework.web.reactive.function.client.ExchangeFilterFunction; |
||||
import org.springframework.web.reactive.function.client.ExchangeFunction; |
||||
|
||||
import static org.springframework.cloud.client.loadbalancer.LoadBalancerUriTools.reconstructURI; |
||||
import static org.springframework.cloud.client.loadbalancer.reactive.ExchangeFilterFunctionUtils.buildClientRequest; |
||||
|
||||
/** |
||||
* An {@link ExchangeFilterFunction} that uses {@link ReactiveLoadBalancer} to execute |
||||
* requests against a correct {@link ServiceInstance} and Reactor Retries to retry the |
||||
* call both against the same and the next service instance, based on the provided |
||||
* {@link LoadBalancerRetryPolicy}. |
||||
* |
||||
* @author Olga Maciaszek-Sharma |
||||
* @since 2.2.7 |
||||
*/ |
||||
public class RetryableLoadBalancerExchangeFilterFunction |
||||
implements ExchangeFilterFunction { |
||||
|
||||
private static final Log LOG = LogFactory |
||||
.getLog(RetryableLoadBalancerExchangeFilterFunction.class); |
||||
|
||||
private static final List<Class<? extends Throwable>> exceptions = Arrays.asList( |
||||
IOException.class, TimeoutException.class, |
||||
RetryableStatusCodeException.class); |
||||
|
||||
private final LoadBalancerRetryPolicy retryPolicy; |
||||
|
||||
private final LoadBalancerRetryProperties retryProperties; |
||||
|
||||
private final ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory; |
||||
|
||||
public RetryableLoadBalancerExchangeFilterFunction( |
||||
LoadBalancerRetryPolicy retryPolicy, |
||||
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory, |
||||
LoadBalancerRetryProperties retryProperties) { |
||||
this.retryPolicy = retryPolicy; |
||||
this.loadBalancerFactory = loadBalancerFactory; |
||||
this.retryProperties = retryProperties; |
||||
} |
||||
|
||||
public RetryableLoadBalancerExchangeFilterFunction( |
||||
ReactiveLoadBalancer.Factory<ServiceInstance> loadBalancerFactory, |
||||
LoadBalancerRetryProperties retryProperties) { |
||||
this.retryPolicy = new RetryableExchangeFilterFunctionLoadBalancerRetryPolicy( |
||||
retryProperties); |
||||
this.loadBalancerFactory = loadBalancerFactory; |
||||
this.retryProperties = retryProperties; |
||||
} |
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" }) |
||||
@Override |
||||
public Mono<ClientResponse> filter(ClientRequest clientRequest, |
||||
ExchangeFunction next) { |
||||
LoadBalancerRetryContext loadBalancerRetryContext = new LoadBalancerRetryContext( |
||||
clientRequest); |
||||
Retry exchangeRetry = buildRetrySpec( |
||||
retryProperties.getMaxRetriesOnSameServiceInstance(), true); |
||||
Retry filterRetry = buildRetrySpec( |
||||
retryProperties.getMaxRetriesOnNextServiceInstance(), false); |
||||
|
||||
URI originalUrl = clientRequest.url(); |
||||
String serviceId = originalUrl.getHost(); |
||||
if (serviceId == null) { |
||||
String message = String.format( |
||||
"Request URI does not contain a valid hostname: %s", |
||||
originalUrl.toString()); |
||||
if (LOG.isWarnEnabled()) { |
||||
LOG.warn(message); |
||||
} |
||||
return Mono.just( |
||||
ClientResponse.create(HttpStatus.BAD_REQUEST).body(message).build()); |
||||
} |
||||
DefaultRequest<RetryableRequestContext> lbRequest = new DefaultRequest<>( |
||||
new RetryableRequestContext(null)); |
||||
return Mono.defer(() -> choose(serviceId, lbRequest).flatMap(lbResponse -> { |
||||
ServiceInstance instance = lbResponse.getServer(); |
||||
lbRequest.setContext(new RetryableRequestContext(instance)); |
||||
if (instance == null) { |
||||
String message = "LoadBalancer does not contain an instance for the service " |
||||
+ serviceId; |
||||
if (LOG.isWarnEnabled()) { |
||||
LOG.warn("LoadBalancer does not contain an instance for the service " |
||||
+ serviceId); |
||||
} |
||||
return Mono.just(ClientResponse.create(HttpStatus.SERVICE_UNAVAILABLE) |
||||
.body(message).build()); |
||||
} |
||||
|
||||
if (LOG.isDebugEnabled()) { |
||||
LOG.debug(String.format( |
||||
"LoadBalancer has retrieved the instance for service %s: %s", |
||||
serviceId, instance.getUri())); |
||||
} |
||||
ClientRequest newRequest = buildClientRequest(clientRequest, |
||||
reconstructURI(instance, originalUrl)); |
||||
return next.exchange(newRequest).map(clientResponse -> { |
||||
loadBalancerRetryContext.setClientResponse(clientResponse); |
||||
if (shouldRetrySameServiceInstance(loadBalancerRetryContext)) { |
||||
if (LOG.isDebugEnabled()) { |
||||
LOG.debug(String.format("Retrying on status code: %d", |
||||
clientResponse.statusCode().value())); |
||||
} |
||||
throw new RetryableStatusCodeException(); |
||||
} |
||||
return clientResponse; |
||||
|
||||
}); |
||||
}).map(clientResponse -> { |
||||
loadBalancerRetryContext.setClientResponse(clientResponse); |
||||
if (shouldRetryNextServiceInstance(loadBalancerRetryContext)) { |
||||
if (LOG.isDebugEnabled()) { |
||||
LOG.debug(String.format("Retrying on status code: %d", |
||||
clientResponse.statusCode().value())); |
||||
} |
||||
throw new RetryableStatusCodeException(); |
||||
} |
||||
return clientResponse; |
||||
|
||||
}).retryWhen(exchangeRetry)).retryWhen(filterRetry); |
||||
} |
||||
|
||||
private Retry buildRetrySpec(int max, boolean transientErrors) { |
||||
LoadBalancerRetryProperties.Backoff backoffProperties = retryProperties |
||||
.getBackoff(); |
||||
if (backoffProperties.isEnabled()) { |
||||
return RetrySpec.backoff(max, backoffProperties.getMinBackoff()) |
||||
.filter(this::isRetryException) |
||||
.maxBackoff(backoffProperties.getMaxBackoff()) |
||||
.jitter(backoffProperties.getJitter()) |
||||
.transientErrors(transientErrors); |
||||
} |
||||
return RetrySpec.max(max).filter(this::isRetryException) |
||||
.transientErrors(transientErrors); |
||||
} |
||||
|
||||
private boolean shouldRetrySameServiceInstance( |
||||
LoadBalancerRetryContext loadBalancerRetryContext) { |
||||
boolean shouldRetry = retryPolicy |
||||
.retryableStatusCode(loadBalancerRetryContext.getResponseStatusCode()) |
||||
&& retryPolicy |
||||
.canRetryOnMethod(loadBalancerRetryContext.getRequestMethod()) |
||||
&& retryPolicy.canRetrySameServiceInstance(loadBalancerRetryContext); |
||||
if (shouldRetry) { |
||||
loadBalancerRetryContext.incrementRetriesSameServiceInstance(); |
||||
} |
||||
return shouldRetry; |
||||
} |
||||
|
||||
private boolean shouldRetryNextServiceInstance( |
||||
LoadBalancerRetryContext loadBalancerRetryContext) { |
||||
boolean shouldRetry = retryPolicy |
||||
.retryableStatusCode(loadBalancerRetryContext.getResponseStatusCode()) |
||||
&& retryPolicy |
||||
.canRetryOnMethod(loadBalancerRetryContext.getRequestMethod()) |
||||
&& retryPolicy.canRetryNextServiceInstance(loadBalancerRetryContext); |
||||
if (shouldRetry) { |
||||
loadBalancerRetryContext.incrementRetriesNextServiceInstance(); |
||||
loadBalancerRetryContext.resetRetriesSameServiceInstance(); |
||||
} |
||||
return shouldRetry; |
||||
} |
||||
|
||||
private boolean isRetryException(Throwable throwable) { |
||||
return exceptions.stream() |
||||
.anyMatch(exception -> exception.isInstance(throwable) |
||||
|| throwable != null && exception.isInstance(throwable.getCause()) |
||||
|| Exceptions.isRetryExhausted(throwable)); |
||||
} |
||||
|
||||
protected Mono<Response<ServiceInstance>> choose(String serviceId, |
||||
Request<RetryableRequestContext> request) { |
||||
ReactiveLoadBalancer<ServiceInstance> loadBalancer = loadBalancerFactory |
||||
.getInstance(serviceId); |
||||
if (loadBalancer == null) { |
||||
return Mono.just( |
||||
new org.springframework.cloud.client.loadbalancer.reactive.EmptyResponse()); |
||||
} |
||||
return Mono.from(loadBalancer.choose(request)); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,28 @@
@@ -0,0 +1,28 @@
|
||||
/* |
||||
* Copyright 2012-2021 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.client.loadbalancer.reactive; |
||||
|
||||
/** |
||||
* An {@link IllegalStateException} used to trigger retries based on the returned HTTP |
||||
* status code. |
||||
* |
||||
* @author Olga Maciaszek-Sharma |
||||
* @since 2.2.7 |
||||
*/ |
||||
class RetryableStatusCodeException extends IllegalStateException { |
||||
|
||||
} |
@ -0,0 +1,218 @@
@@ -0,0 +1,218 @@
|
||||
/* |
||||
* Copyright 2012-2021 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.client.loadbalancer.reactive; |
||||
|
||||
import java.net.URI; |
||||
import java.util.Arrays; |
||||
import java.util.Collections; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
|
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Disabled; |
||||
import org.junit.jupiter.api.Test; |
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.boot.SpringBootConfiguration; |
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; |
||||
import org.springframework.boot.test.context.SpringBootTest; |
||||
import org.springframework.boot.web.server.LocalServerPort; |
||||
import org.springframework.cloud.client.DefaultServiceInstance; |
||||
import org.springframework.cloud.client.ServiceInstance; |
||||
import org.springframework.cloud.client.discovery.DiscoveryClient; |
||||
import org.springframework.cloud.client.discovery.EnableDiscoveryClient; |
||||
import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryProperties; |
||||
import org.springframework.cloud.client.loadbalancer.LoadBalancerRetryProperties; |
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.http.HttpStatus; |
||||
import org.springframework.web.bind.annotation.GetMapping; |
||||
import org.springframework.web.bind.annotation.RestController; |
||||
import org.springframework.web.reactive.function.client.ClientResponse; |
||||
import org.springframework.web.reactive.function.client.WebClient; |
||||
|
||||
import static org.assertj.core.api.Assertions.assertThatCode; |
||||
import static org.assertj.core.api.BDDAssertions.then; |
||||
import static org.springframework.boot.test.context.SpringBootTest.WebEnvironment.RANDOM_PORT; |
||||
|
||||
/** |
||||
* Integration tests for {@link RetryableLoadBalancerExchangeFilterFunction}. |
||||
* |
||||
* @author Olga Maciaszek-Sharma |
||||
* @since 2.2.7 |
||||
*/ |
||||
@SpringBootTest(webEnvironment = RANDOM_PORT) |
||||
class RetryableLoadBalancerExchangeFilterFunctionIntegrationTests { |
||||
|
||||
@Autowired |
||||
private RetryableLoadBalancerExchangeFilterFunction loadBalancerFunction; |
||||
|
||||
@Autowired |
||||
private SimpleDiscoveryProperties properties; |
||||
|
||||
@Autowired |
||||
private LoadBalancerRetryProperties retryProperties; |
||||
|
||||
@LocalServerPort |
||||
private int port; |
||||
|
||||
@BeforeEach |
||||
void setUp() { |
||||
DefaultServiceInstance instance = new DefaultServiceInstance(); |
||||
instance.setServiceId("testservice"); |
||||
instance.setUri(URI.create("http://localhost:" + port)); |
||||
DefaultServiceInstance instanceWithNoLifecycleProcessors = new DefaultServiceInstance(); |
||||
instanceWithNoLifecycleProcessors |
||||
.setServiceId("serviceWithNoLifecycleProcessors"); |
||||
instanceWithNoLifecycleProcessors.setUri(URI.create("http://localhost:" + port)); |
||||
properties.getInstances().put("testservice", Collections.singletonList(instance)); |
||||
properties.getInstances().put("serviceWithNoLifecycleProcessors", |
||||
Collections.singletonList(instanceWithNoLifecycleProcessors)); |
||||
} |
||||
|
||||
@Test |
||||
void correctResponseReturnedForExistingHostAndInstancePresent() { |
||||
ClientResponse clientResponse = WebClient.builder().baseUrl("http://testservice") |
||||
.filter(this.loadBalancerFunction).build().get().uri("/hello").exchange() |
||||
.block(); |
||||
|
||||
then(clientResponse.statusCode()).isEqualTo(HttpStatus.OK); |
||||
then(clientResponse.bodyToMono(String.class).block()).isEqualTo("Hello World"); |
||||
} |
||||
|
||||
@Test |
||||
void correctResponseReturnedAfterRetryingOnSameServiceInstance() { |
||||
retryProperties.setMaxRetriesOnSameServiceInstance(1); |
||||
retryProperties.getRetryableStatusCodes().add(500); |
||||
|
||||
ClientResponse clientResponse = WebClient.builder().baseUrl("http://testservice") |
||||
.filter(this.loadBalancerFunction).build().get().uri("/exception") |
||||
.exchange().block(); |
||||
|
||||
then(clientResponse.statusCode()).isEqualTo(HttpStatus.OK); |
||||
then(clientResponse.bodyToMono(String.class).block()).isEqualTo("Hello World!"); |
||||
} |
||||
|
||||
// FIXME - flaky test
|
||||
@Disabled |
||||
@Test |
||||
void correctResponseReturnedAfterRetryingOnNextServiceInstanceWithBackoff() { |
||||
retryProperties.getBackoff().setEnabled(true); |
||||
retryProperties.setMaxRetriesOnSameServiceInstance(1); |
||||
DefaultServiceInstance goodRetryTestInstance = new DefaultServiceInstance(); |
||||
goodRetryTestInstance.setServiceId("retrytest"); |
||||
goodRetryTestInstance.setUri(URI.create("http://localhost:" + port)); |
||||
DefaultServiceInstance badRetryTestInstance = new DefaultServiceInstance(); |
||||
badRetryTestInstance.setServiceId("retrytest"); |
||||
badRetryTestInstance.setUri(URI.create("http://localhost:" + 8080)); |
||||
properties.getInstances().put("retrytest", |
||||
Arrays.asList(badRetryTestInstance, goodRetryTestInstance)); |
||||
retryProperties.getRetryableStatusCodes().add(500); |
||||
|
||||
ClientResponse clientResponse = WebClient.builder().baseUrl("http://retrytest") |
||||
.filter(this.loadBalancerFunction).build().get().uri("/hello").exchange() |
||||
.block(); |
||||
|
||||
then(clientResponse.statusCode()).isEqualTo(HttpStatus.OK); |
||||
then(clientResponse.bodyToMono(String.class).block()).isEqualTo("Hello World"); |
||||
|
||||
ClientResponse secondClientResponse = WebClient.builder() |
||||
.baseUrl("http://retrytest").filter(this.loadBalancerFunction).build() |
||||
.get().uri("/hello").exchange().block(); |
||||
|
||||
then(secondClientResponse.statusCode()).isEqualTo(HttpStatus.OK); |
||||
then(secondClientResponse.bodyToMono(String.class).block()) |
||||
.isEqualTo("Hello World"); |
||||
} |
||||
|
||||
@Test |
||||
void serviceUnavailableReturnedWhenNoInstancePresent() { |
||||
ClientResponse clientResponse = WebClient.builder().baseUrl("http://xxx") |
||||
.filter(this.loadBalancerFunction).build().get().exchange().block(); |
||||
|
||||
then(clientResponse.statusCode()).isEqualTo(HttpStatus.SERVICE_UNAVAILABLE); |
||||
} |
||||
|
||||
@Test |
||||
@Disabled |
||||
// FIXME 3.0.0
|
||||
void badRequestReturnedForIncorrectHost() { |
||||
ClientResponse clientResponse = WebClient.builder().baseUrl("http:///xxx") |
||||
.filter(this.loadBalancerFunction).build().get().exchange().block(); |
||||
|
||||
then(clientResponse.statusCode()).isEqualTo(HttpStatus.BAD_REQUEST); |
||||
} |
||||
|
||||
@Test |
||||
void exceptionNotThrownWhenFactoryReturnsNullLifecycleProcessorsMap() { |
||||
assertThatCode(() -> WebClient.builder() |
||||
.baseUrl("http://serviceWithNoLifecycleProcessors") |
||||
.filter(this.loadBalancerFunction).build().get().uri("/hello").exchange() |
||||
.block()).doesNotThrowAnyException(); |
||||
} |
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" }) |
||||
@EnableDiscoveryClient |
||||
@EnableAutoConfiguration |
||||
@SpringBootConfiguration(proxyBeanMethods = false) |
||||
@RestController |
||||
static class Config { |
||||
|
||||
AtomicInteger exceptionCallsCount = new AtomicInteger(); |
||||
|
||||
@GetMapping("/hello") |
||||
public String hello() { |
||||
return "Hello World"; |
||||
} |
||||
|
||||
@GetMapping("/callback") |
||||
String callbackTestResult() { |
||||
return "callbackTestResult"; |
||||
} |
||||
|
||||
@GetMapping("/exception") |
||||
String exception() { |
||||
int callCount = exceptionCallsCount.incrementAndGet(); |
||||
if (callCount % 2 != 0) { |
||||
throw new IllegalStateException("Test!"); |
||||
} |
||||
return "Hello World!"; |
||||
} |
||||
|
||||
@Bean |
||||
ReactiveLoadBalancer.Factory<ServiceInstance> reactiveLoadBalancerFactory( |
||||
DiscoveryClient discoveryClient) { |
||||
return serviceId -> new DiscoveryClientBasedReactiveLoadBalancer(serviceId, |
||||
discoveryClient); |
||||
} |
||||
|
||||
@Bean |
||||
LoadBalancerRetryProperties loadBalancerRetryProperties() { |
||||
return new LoadBalancerRetryProperties(); |
||||
} |
||||
|
||||
@Bean |
||||
RetryableLoadBalancerExchangeFilterFunction exchangeFilterFunction( |
||||
LoadBalancerRetryProperties properties, |
||||
ReactiveLoadBalancer.Factory<ServiceInstance> factory) { |
||||
return new RetryableLoadBalancerExchangeFilterFunction( |
||||
new RetryableExchangeFilterFunctionLoadBalancerRetryPolicy( |
||||
properties), |
||||
factory, properties); |
||||
} |
||||
|
||||
} |
||||
|
||||
} |
@ -0,0 +1,161 @@
@@ -0,0 +1,161 @@
|
||||
/* |
||||
* Copyright 2012-2021 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.cloud.client.loadbalancer.reactive; |
||||
|
||||
import java.io.IOException; |
||||
import java.net.URI; |
||||
|
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.mockito.InOrder; |
||||
import reactor.core.publisher.Mono; |
||||
|
||||
import org.springframework.cloud.client.ServiceInstance; |
||||
import org.springframework.cloud.client.loadbalancer.LoadBalancerRetryProperties; |
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.http.HttpMethod; |
||||
import org.springframework.http.HttpStatus; |
||||
import org.springframework.web.reactive.function.client.ClientRequest; |
||||
import org.springframework.web.reactive.function.client.ClientResponse; |
||||
import org.springframework.web.reactive.function.client.ExchangeFunction; |
||||
|
||||
import static org.mockito.ArgumentMatchers.any; |
||||
import static org.mockito.Mockito.inOrder; |
||||
import static org.mockito.Mockito.mock; |
||||
import static org.mockito.Mockito.times; |
||||
import static org.mockito.Mockito.verify; |
||||
import static org.mockito.Mockito.when; |
||||
|
||||
/** |
||||
* Tests for {@link RetryableLoadBalancerExchangeFilterFunction}. |
||||
* |
||||
* @author Olga Maciaszek-Sharma |
||||
* @since 2.2.7 |
||||
*/ |
||||
@SuppressWarnings("unchecked") |
||||
class RetryableLoadBalancerExchangeFilterFunctionTests { |
||||
|
||||
private final LoadBalancerRetryProperties properties = new LoadBalancerRetryProperties(); |
||||
|
||||
private final LoadBalancerRetryPolicy policy = new RetryableExchangeFilterFunctionLoadBalancerRetryPolicy( |
||||
properties); |
||||
|
||||
private final ReactiveLoadBalancer.Factory<ServiceInstance> factory = mock( |
||||
ReactiveLoadBalancer.Factory.class); |
||||
|
||||
private final RetryableLoadBalancerExchangeFilterFunction filterFunction = new RetryableLoadBalancerExchangeFilterFunction( |
||||
policy, factory, properties); |
||||
|
||||
private final ClientRequest clientRequest = mock(ClientRequest.class); |
||||
|
||||
private final ExchangeFunction next = mock(ExchangeFunction.class); |
||||
|
||||
private final ClientResponse clientResponse = mock(ClientResponse.class); |
||||
|
||||
private final InOrder inOrder = inOrder(next, factory); |
||||
|
||||
@BeforeEach |
||||
void setUp() { |
||||
properties.setMaxRetriesOnSameServiceInstance(1); |
||||
properties.getRetryableStatusCodes().add(404); |
||||
when(clientRequest.url()).thenReturn(URI.create("http://test")); |
||||
when(factory.getInstance("test")).thenReturn(new TestReactiveLoadBalancer()); |
||||
when(clientRequest.headers()).thenReturn(new HttpHeaders()); |
||||
when(clientRequest.cookies()).thenReturn(new HttpHeaders()); |
||||
|
||||
} |
||||
|
||||
@Test |
||||
void shouldRetryOnSameAndNextServiceInstanceOnException() { |
||||
when(clientRequest.method()).thenReturn(HttpMethod.GET); |
||||
when(clientResponse.statusCode()).thenReturn(HttpStatus.OK); |
||||
when(next.exchange(any())) |
||||
.thenThrow(new IllegalStateException(new IOException())); |
||||
|
||||
try { |
||||
filterFunction.filter(clientRequest, next).subscribe(); |
||||
} |
||||
catch (Exception ignored) { |
||||
} |
||||
|
||||
inOrder.verify(factory, times(1)).getInstance(any()); |
||||
inOrder.verify(next, times(2)).exchange(any()); |
||||
inOrder.verify(factory, times(1)).getInstance(any()); |
||||
inOrder.verify(next, times(2)).exchange(any()); |
||||
} |
||||
|
||||
@Test |
||||
void shouldRetryOnSameAndNextServiceInstanceOnRetryableStatusCode() { |
||||
when(clientRequest.method()).thenReturn(HttpMethod.GET); |
||||
when(clientResponse.statusCode()).thenReturn(HttpStatus.NOT_FOUND); |
||||
when(next.exchange(any())).thenReturn(Mono.just(clientResponse)); |
||||
|
||||
filterFunction.filter(clientRequest, next).subscribe(); |
||||
|
||||
inOrder.verify(factory, times(1)).getInstance(any()); |
||||
inOrder.verify(next, times(2)).exchange(any()); |
||||
inOrder.verify(factory, times(1)).getInstance(any()); |
||||
inOrder.verify(next, times(2)).exchange(any()); |
||||
} |
||||
|
||||
@Test |
||||
void shouldNotRetryWhenNoRetryableExceptionOrStatusCode() { |
||||
when(clientRequest.method()).thenReturn(HttpMethod.GET); |
||||
when(clientResponse.statusCode()).thenReturn(HttpStatus.OK); |
||||
when(next.exchange(any())).thenReturn(Mono.just(clientResponse)); |
||||
|
||||
filterFunction.filter(clientRequest, next).subscribe(); |
||||
|
||||
verify(next, times(1)).exchange(any()); |
||||
verify(factory, times(1)).getInstance(any()); |
||||
} |
||||
|
||||
@Test |
||||
void shouldNotRetryOnMethodOtherThanGet() { |
||||
when(clientRequest.method()).thenReturn(HttpMethod.POST); |
||||
when(clientResponse.statusCode()).thenReturn(HttpStatus.NOT_FOUND); |
||||
when(next.exchange(any())).thenReturn(Mono.just(clientResponse)); |
||||
|
||||
filterFunction.filter(clientRequest, next).subscribe(); |
||||
|
||||
verify(next, times(1)).exchange(any()); |
||||
verify(factory, times(1)).getInstance(any()); |
||||
} |
||||
|
||||
@Test |
||||
void shouldRetryOnMethodOtherThanGetWhenEnabled() { |
||||
LoadBalancerRetryProperties properties = new LoadBalancerRetryProperties(); |
||||
properties.setRetryOnAllOperations(true); |
||||
properties.setMaxRetriesOnSameServiceInstance(1); |
||||
properties.getRetryableStatusCodes().add(404); |
||||
LoadBalancerRetryPolicy policy = new RetryableExchangeFilterFunctionLoadBalancerRetryPolicy( |
||||
properties); |
||||
RetryableLoadBalancerExchangeFilterFunction filterFunction = new RetryableLoadBalancerExchangeFilterFunction( |
||||
policy, factory, properties); |
||||
when(clientRequest.method()).thenReturn(HttpMethod.POST); |
||||
when(clientResponse.statusCode()).thenReturn(HttpStatus.NOT_FOUND); |
||||
when(next.exchange(any())).thenReturn(Mono.just(clientResponse)); |
||||
|
||||
filterFunction.filter(clientRequest, next).subscribe(); |
||||
|
||||
inOrder.verify(factory, times(1)).getInstance(any()); |
||||
inOrder.verify(next, times(2)).exchange(any()); |
||||
inOrder.verify(factory, times(1)).getInstance(any()); |
||||
inOrder.verify(next, times(2)).exchange(any()); |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue