From 12a9a2e4851756d5610c93c25fb3ea52c2fbd85e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20D=C3=B6rnbrack?= Date: Fri, 9 Jun 2017 10:12:24 +0200 Subject: [PATCH 1/3] code cleanup --- .../ribbon/RibbonLoadBalancedRetryPolicy.java | 14 +++++++------- .../RibbonLoadBalancedRetryPolicyFactoryTest.java | 5 +++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/spring-cloud-netflix-core/src/main/java/org/springframework/cloud/netflix/ribbon/RibbonLoadBalancedRetryPolicy.java b/spring-cloud-netflix-core/src/main/java/org/springframework/cloud/netflix/ribbon/RibbonLoadBalancedRetryPolicy.java index 4e062ae5..874215f8 100644 --- a/spring-cloud-netflix-core/src/main/java/org/springframework/cloud/netflix/ribbon/RibbonLoadBalancedRetryPolicy.java +++ b/spring-cloud-netflix-core/src/main/java/org/springframework/cloud/netflix/ribbon/RibbonLoadBalancedRetryPolicy.java @@ -18,17 +18,17 @@ package org.springframework.cloud.netflix.ribbon; -import java.util.ArrayList; -import java.util.List; +import com.netflix.client.config.CommonClientConfigKey; +import com.netflix.client.config.IClientConfig; +import com.netflix.client.config.IClientConfigKey; import org.springframework.cloud.client.loadbalancer.LoadBalancedRetryContext; import org.springframework.cloud.client.loadbalancer.LoadBalancedRetryPolicy; import org.springframework.cloud.client.loadbalancer.ServiceInstanceChooser; -import org.springframework.core.env.Environment; import org.springframework.http.HttpMethod; import org.springframework.util.StringUtils; -import com.netflix.client.config.CommonClientConfigKey; -import com.netflix.client.config.IClientConfig; -import com.netflix.client.config.IClientConfigKey; + +import java.util.ArrayList; +import java.util.List; /** * {@link LoadBalancedRetryPolicy} for Ribbon clients. @@ -60,7 +60,7 @@ public class RibbonLoadBalancedRetryPolicy implements LoadBalancedRetryPolicy { for(String code : retryableStatusCodesArray) { if(!StringUtils.isEmpty(code)) { try { - retryableStatusCodes.add(Integer.valueOf(code)); + retryableStatusCodes.add(Integer.valueOf(code.trim())); } catch (NumberFormatException e) { //TODO log } diff --git a/spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/ribbon/RibbonLoadBalancedRetryPolicyFactoryTest.java b/spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/ribbon/RibbonLoadBalancedRetryPolicyFactoryTest.java index 3fedccea..90c5739c 100644 --- a/spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/ribbon/RibbonLoadBalancedRetryPolicyFactoryTest.java +++ b/spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/ribbon/RibbonLoadBalancedRetryPolicyFactoryTest.java @@ -201,7 +201,7 @@ public class RibbonLoadBalancedRetryPolicyFactoryTest { } @Test - public void testRetryableStatusCodest() throws Exception { + public void testRetryableStatusCodes() throws Exception { int sameServer = 3; int nextServer = 3; RibbonServer server = getRibbonServer(); @@ -210,7 +210,7 @@ public class RibbonLoadBalancedRetryPolicyFactoryTest { doReturn(nextServer).when(config).get(eq(CommonClientConfigKey.MaxAutoRetriesNextServer), anyInt()); doReturn(false).when(config).get(eq(CommonClientConfigKey.OkToRetryOnAllOperations), eq(false)); doReturn(config).when(clientFactory).getClientConfig(eq(server.getServiceId())); - doReturn("404,502,foo, ,").when(config).getPropertyAsString(eq(RibbonLoadBalancedRetryPolicy.RETRYABLE_STATUS_CODES),eq("")); + doReturn("404, 418,502,foo, ,").when(config).getPropertyAsString(eq(RibbonLoadBalancedRetryPolicy.RETRYABLE_STATUS_CODES),eq("")); clientFactory.getLoadBalancerContext(server.getServiceId()).setRetryHandler(new DefaultLoadBalancerRetryHandler(config)); RibbonLoadBalancerClient client = getRibbonLoadBalancerClient(server); RibbonLoadBalancedRetryPolicyFactory factory = new RibbonLoadBalancedRetryPolicyFactory(clientFactory); @@ -219,6 +219,7 @@ public class RibbonLoadBalancedRetryPolicyFactoryTest { doReturn(HttpMethod.GET).when(request).getMethod(); assertThat(policy.retryableStatusCode(400), is(false)); assertThat(policy.retryableStatusCode(404), is(true)); + assertThat(policy.retryableStatusCode(418), is(true)); assertThat(policy.retryableStatusCode(502), is(true)); } From 0334c5a2897bec3c31b2d86b45fae132807072c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20D=C3=B6rnbrack?= Date: Fri, 9 Jun 2017 10:12:24 +0200 Subject: [PATCH 2/3] Cleanup. Cherry picking #2031. --- .../ribbon/RibbonLoadBalancedRetryPolicyFactoryTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/ribbon/RibbonLoadBalancedRetryPolicyFactoryTest.java b/spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/ribbon/RibbonLoadBalancedRetryPolicyFactoryTest.java index 90c5739c..a888bde9 100644 --- a/spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/ribbon/RibbonLoadBalancedRetryPolicyFactoryTest.java +++ b/spring-cloud-netflix-core/src/test/java/org/springframework/cloud/netflix/ribbon/RibbonLoadBalancedRetryPolicyFactoryTest.java @@ -210,7 +210,7 @@ public class RibbonLoadBalancedRetryPolicyFactoryTest { doReturn(nextServer).when(config).get(eq(CommonClientConfigKey.MaxAutoRetriesNextServer), anyInt()); doReturn(false).when(config).get(eq(CommonClientConfigKey.OkToRetryOnAllOperations), eq(false)); doReturn(config).when(clientFactory).getClientConfig(eq(server.getServiceId())); - doReturn("404, 418,502,foo, ,").when(config).getPropertyAsString(eq(RibbonLoadBalancedRetryPolicy.RETRYABLE_STATUS_CODES),eq("")); + doReturn("404,502, 418,foo, ,").when(config).getPropertyAsString(eq(RibbonLoadBalancedRetryPolicy.RETRYABLE_STATUS_CODES),eq("")); clientFactory.getLoadBalancerContext(server.getServiceId()).setRetryHandler(new DefaultLoadBalancerRetryHandler(config)); RibbonLoadBalancerClient client = getRibbonLoadBalancerClient(server); RibbonLoadBalancedRetryPolicyFactory factory = new RibbonLoadBalancedRetryPolicyFactory(clientFactory); @@ -221,6 +221,7 @@ public class RibbonLoadBalancedRetryPolicyFactoryTest { assertThat(policy.retryableStatusCode(404), is(true)); assertThat(policy.retryableStatusCode(418), is(true)); assertThat(policy.retryableStatusCode(502), is(true)); + assertThat(policy.retryableStatusCode(418), is(true)); } protected RibbonLoadBalancerClient getRibbonLoadBalancerClient( From 4783b07407554bd3d5a727ef18fe498623b85acc Mon Sep 17 00:00:00 2001 From: Dave Syer Date: Mon, 12 Jun 2017 15:43:11 +0100 Subject: [PATCH 3/3] Support for multi-valued (batched) events in stream aggregator This allows clients to send batched up events in the same format as before (or to continue to send single events). We can switch the default format to an array in 1.4.x. --- .../stream/HystrixStreamAggregator.java | 28 +++++++++++++++---- .../stream/HystrixStreamAggregatorTests.java | 22 +++++++++++---- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/spring-cloud-netflix-turbine-stream/src/main/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregator.java b/spring-cloud-netflix-turbine-stream/src/main/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregator.java index ac50a54a..ca5b5f42 100644 --- a/spring-cloud-netflix-turbine-stream/src/main/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregator.java +++ b/spring-cloud-netflix-turbine-stream/src/main/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregator.java @@ -17,6 +17,7 @@ package org.springframework.cloud.netflix.turbine.stream; import java.io.IOException; +import java.util.List; import java.util.Map; import org.springframework.beans.factory.annotation.Autowired; @@ -56,18 +57,33 @@ public class HystrixStreamAggregator { payload = payload.replace("\\\"", "\""); } try { - @SuppressWarnings("unchecked") - Map map = this.objectMapper.readValue(payload, Map.class); - Map data = getPayloadData(map); - - log.debug("Received hystrix stream payload: " + data); - this.subject.onNext(data); + if (payload.startsWith("[")) { + @SuppressWarnings("unchecked") + List> list = this.objectMapper.readValue(payload, + List.class); + for (Map map : list) { + sendMap(map); + } + } + else { + @SuppressWarnings("unchecked") + Map map = this.objectMapper.readValue(payload, Map.class); + sendMap(map); + } } catch (IOException ex) { log.error("Error receiving hystrix stream payload: " + payload, ex); } } + private void sendMap(Map map) { + Map data = getPayloadData(map); + if (log.isDebugEnabled()) { + log.debug("Received hystrix stream payload: " + data); + } + this.subject.onNext(data); + } + public static Map getPayloadData(Map jsonMap) { @SuppressWarnings("unchecked") Map origin = (Map) jsonMap.get("origin"); diff --git a/spring-cloud-netflix-turbine-stream/src/test/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregatorTests.java b/spring-cloud-netflix-turbine-stream/src/test/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregatorTests.java index 998bb1ca..5d9352c3 100644 --- a/spring-cloud-netflix-turbine-stream/src/test/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregatorTests.java +++ b/spring-cloud-netflix-turbine-stream/src/test/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregatorTests.java @@ -16,18 +16,19 @@ package org.springframework.cloud.netflix.turbine.stream; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.not; -import static org.junit.Assert.assertThat; - import java.util.Map; +import com.fasterxml.jackson.databind.ObjectMapper; + import org.junit.Rule; import org.junit.Test; + import org.springframework.boot.test.rule.OutputCapture; -import com.fasterxml.jackson.databind.ObjectMapper; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertThat; import rx.subjects.PublishSubject; @@ -52,6 +53,15 @@ public class HystrixStreamAggregatorTests { this.output.expect(not(containsString("ERROR"))); } + @Test + public void messageWrappedInArray() throws Exception { + this.publisher.subscribe(map -> { + assertThat(map.get("type"), equalTo("HystrixCommand")); + }); + this.aggregator.sendToSubject("[" + PAYLOAD + "]"); + this.output.expect(not(containsString("ERROR"))); + } + @Test public void doubleEncodedMessage() throws Exception { this.publisher.subscribe(map -> {