From 1c0855b495b83747f43bcaa2b09053532b1508a3 Mon Sep 17 00:00:00 2001 From: lly5044 <504496614@qq.com> Date: Tue, 28 Nov 2017 19:25:14 -0600 Subject: [PATCH] add response and uri field to RetryableStatusCodeException (#272) * add response and uri field to RetryableStatusCodeException * fix the bug in RetryLoadBalancerInterceptor * wrap throwable regardless --- .../RetryLoadBalancerInterceptor.java | 114 ++++++++++++++---- .../RetryableStatusCodeException.java | 19 +++ .../RetryLoadBalancerInterceptorTest.java | 37 +++++- 3 files changed, 146 insertions(+), 24 deletions(-) diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/RetryLoadBalancerInterceptor.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/RetryLoadBalancerInterceptor.java index f33ab29f..d59f1103 100644 --- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/RetryLoadBalancerInterceptor.java +++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/RetryLoadBalancerInterceptor.java @@ -16,16 +16,23 @@ package org.springframework.cloud.client.loadbalancer; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import org.springframework.cloud.client.ServiceInstance; +import org.springframework.http.HttpHeaders; import org.springframework.http.HttpRequest; +import org.springframework.http.HttpStatus; import org.springframework.http.client.ClientHttpRequestExecution; import org.springframework.http.client.ClientHttpRequestInterceptor; import org.springframework.http.client.ClientHttpResponse; +import org.springframework.retry.RecoveryCallback; import org.springframework.retry.RetryCallback; import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryException; import org.springframework.retry.backoff.BackOffPolicy; import org.springframework.retry.backoff.NoBackOffPolicy; import org.springframework.retry.policy.NeverRetryPolicy; @@ -89,27 +96,90 @@ public class RetryLoadBalancerInterceptor implements ClientHttpRequestIntercepto serviceName)); return template .execute(new RetryCallback() { - @Override - public ClientHttpResponse doWithRetry(RetryContext context) - throws IOException { - ServiceInstance serviceInstance = null; - if (context instanceof LoadBalancedRetryContext) { - LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context; - serviceInstance = lbContext.getServiceInstance(); - } - if (serviceInstance == null) { - serviceInstance = loadBalancer.choose(serviceName); - } - ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer.execute( - serviceName, serviceInstance, - requestFactory.createRequest(request, body, execution)); - int statusCode = response.getRawStatusCode(); - if(retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) { - response.close(); - throw new RetryableStatusCodeException(serviceName, statusCode); - } - return response; - } - }); + @Override + public ClientHttpResponse doWithRetry(RetryContext context) + throws IOException { + ServiceInstance serviceInstance = null; + if (context instanceof LoadBalancedRetryContext) { + LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context; + serviceInstance = lbContext.getServiceInstance(); + } + if (serviceInstance == null) { + serviceInstance = loadBalancer.choose(serviceName); + } + ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer.execute( + serviceName, serviceInstance, + requestFactory.createRequest(request, body, execution)); + int statusCode = response.getRawStatusCode(); + if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) { + ClientHttpResponseWrapper wrapper = new ClientHttpResponseWrapper(response); + wrapper.init(); + throw new RetryableStatusCodeException(serviceName, statusCode, wrapper, null); + } + return response; + } + }, new RecoveryCallback() { + @Override + public ClientHttpResponse recover(RetryContext retryContext) throws Exception { + Throwable lastThrowable = retryContext.getLastThrowable(); + if (lastThrowable != null && lastThrowable instanceof RetryableStatusCodeException) { + RetryableStatusCodeException ex = (RetryableStatusCodeException) lastThrowable; + return (ClientHttpResponse) ex.getResponse(); + } + throw new RetryException("Could not recover", lastThrowable); + } + }); } + + public static class ClientHttpResponseWrapper implements ClientHttpResponse { + + private ClientHttpResponse response; + private InputStream content; + + public ClientHttpResponseWrapper(ClientHttpResponse response) { + this.response = response; + } + + public void init() throws IOException { + InputStream body = response.getBody(); + ByteArrayOutputStream temp = new ByteArrayOutputStream(); + byte[] buffer = new byte[4096]; + int length = 0; + while ((length = body.read(buffer)) != -1) { + temp.write(buffer, 0, length); + } + content = new ByteArrayInputStream(temp.toByteArray()); + response.close(); + } + + @Override + public HttpStatus getStatusCode() throws IOException { + return response.getStatusCode(); + } + + @Override + public int getRawStatusCode() throws IOException { + return response.getRawStatusCode(); + } + + @Override + public String getStatusText() throws IOException { + return response.getStatusText(); + } + + @Override + public void close() { + response.close(); + } + + @Override + public InputStream getBody() throws IOException { + return content; + } + + @Override + public HttpHeaders getHeaders() { + return response.getHeaders(); + } + } } diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/RetryableStatusCodeException.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/RetryableStatusCodeException.java index d5d4b2db..ddce92d8 100644 --- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/RetryableStatusCodeException.java +++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/loadbalancer/RetryableStatusCodeException.java @@ -1,6 +1,7 @@ package org.springframework.cloud.client.loadbalancer; import java.io.IOException; +import java.net.URI; /** * Exception to be thrown when the status code is deemed to be retryable. @@ -10,7 +11,25 @@ public class RetryableStatusCodeException extends IOException { private static final String MESSAGE = "Service %s returned a status code of %d"; + private Object response; + + private URI uri; + public RetryableStatusCodeException(String serviceId, int statusCode) { super(String.format(MESSAGE, serviceId, statusCode)); } + + public RetryableStatusCodeException(String serviceId, int statusCode, Object response, URI uri) { + super(String.format(MESSAGE, serviceId, statusCode)); + this.response = response; + this.uri = uri; + } + + public Object getResponse() { + return response; + } + + public URI getUri() { + return uri; + } } diff --git a/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/RetryLoadBalancerInterceptorTest.java b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/RetryLoadBalancerInterceptorTest.java index e749c225..8e913df3 100644 --- a/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/RetryLoadBalancerInterceptorTest.java +++ b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/loadbalancer/RetryLoadBalancerInterceptorTest.java @@ -1,5 +1,6 @@ package org.springframework.cloud.client.loadbalancer; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -15,6 +16,7 @@ import org.springframework.http.client.ClientHttpRequestExecution; import org.springframework.http.client.ClientHttpResponse; import org.springframework.mock.http.client.MockClientHttpResponse; import org.springframework.retry.RetryContext; +import org.springframework.retry.RetryException; import org.springframework.retry.backoff.BackOffContext; import org.springframework.retry.backoff.BackOffInterruptedException; import org.springframework.retry.backoff.BackOffPolicy; @@ -57,7 +59,7 @@ public class RetryLoadBalancerInterceptorTest { lbProperties = null; } - @Test(expected = IOException.class) + @Test(expected = RetryException.class) public void interceptDisableRetry() throws Throwable { HttpRequest request = mock(HttpRequest.class); when(request.getURI()).thenReturn(new URI("http://foo")); @@ -142,6 +144,7 @@ public class RetryLoadBalancerInterceptorTest { HttpRequest request = mock(HttpRequest.class); when(request.getURI()).thenReturn(new URI("http://foo")); InputStream notFoundStream = mock(InputStream.class); + when(notFoundStream.read(any(byte[].class))).thenReturn(-1); ClientHttpResponse clientHttpResponseNotFound = new MockClientHttpResponse(notFoundStream, HttpStatus.NOT_FOUND); ClientHttpResponse clientHttpResponseOk = new MockClientHttpResponse(new byte[]{}, HttpStatus.OK); LoadBalancedRetryPolicy policy = mock(LoadBalancedRetryPolicy.class); @@ -166,6 +169,36 @@ public class RetryLoadBalancerInterceptorTest { verify(lbRequestFactory, times(2)).createRequest(request, body, execution); } + @Test + public void interceptRetryFailOnStatusCode() throws Throwable { + HttpRequest request = mock(HttpRequest.class); + when(request.getURI()).thenReturn(new URI("http://foo")); + InputStream notFoundStream = new ByteArrayInputStream("foo".getBytes()); + ClientHttpResponse clientHttpResponseNotFound = new MockClientHttpResponse(notFoundStream, HttpStatus.NOT_FOUND); + LoadBalancedRetryPolicy policy = mock(LoadBalancedRetryPolicy.class); + when(policy.retryableStatusCode(eq(HttpStatus.NOT_FOUND.value()))).thenReturn(true); + when(policy.canRetryNextServer(any(LoadBalancedRetryContext.class))).thenReturn(false); + LoadBalancedRetryPolicyFactory lbRetryPolicyFactory = mock(LoadBalancedRetryPolicyFactory.class); + when(lbRetryPolicyFactory.create(eq("foo"), any(ServiceInstanceChooser.class))).thenReturn(policy); + ServiceInstance serviceInstance = mock(ServiceInstance.class); + when(client.choose(eq("foo"))).thenReturn(serviceInstance); + when(client.execute(eq("foo"), eq(serviceInstance), any(LoadBalancerRequest.class))). + thenReturn(clientHttpResponseNotFound); + lbProperties.setEnabled(true); + RetryLoadBalancerInterceptor interceptor = new RetryLoadBalancerInterceptor(client, lbProperties, lbRetryPolicyFactory, lbRequestFactory); + byte[] body = new byte[]{}; + ClientHttpRequestExecution execution = mock(ClientHttpRequestExecution.class); + ClientHttpResponse rsp = interceptor.intercept(request, body, execution); + verify(client, times(1)).execute(eq("foo"), eq(serviceInstance), any(LoadBalancerRequest.class)); + verify(lbRequestFactory, times(1)).createRequest(request, body, execution); + verify(policy, times(2)).canRetryNextServer(any(LoadBalancedRetryContext.class)); + //call twice in a retry attempt + byte[] content = new byte[1024]; + int length = rsp.getBody().read(content); + assertThat(length, is("foo".getBytes().length)); + assertThat(new String(content, 0, length), is("foo")); + } + @Test public void interceptRetry() throws Throwable { HttpRequest request = mock(HttpRequest.class); @@ -193,7 +226,7 @@ public class RetryLoadBalancerInterceptorTest { assertThat(backOffPolicy.getBackoffAttempts(), is(1)); } - @Test(expected = IOException.class) + @Test(expected = RetryException.class) public void interceptFailedRetry() throws Exception { HttpRequest request = mock(HttpRequest.class); when(request.getURI()).thenReturn(new URI("http://foo"));