diff --git a/spring-cloud-netflix-turbine-stream/src/main/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregator.java b/spring-cloud-netflix-turbine-stream/src/main/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregator.java index ac50a54a..ca5b5f42 100644 --- a/spring-cloud-netflix-turbine-stream/src/main/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregator.java +++ b/spring-cloud-netflix-turbine-stream/src/main/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregator.java @@ -17,6 +17,7 @@ package org.springframework.cloud.netflix.turbine.stream; import java.io.IOException; +import java.util.List; import java.util.Map; import org.springframework.beans.factory.annotation.Autowired; @@ -56,18 +57,33 @@ public class HystrixStreamAggregator { payload = payload.replace("\\\"", "\""); } try { - @SuppressWarnings("unchecked") - Map map = this.objectMapper.readValue(payload, Map.class); - Map data = getPayloadData(map); - - log.debug("Received hystrix stream payload: " + data); - this.subject.onNext(data); + if (payload.startsWith("[")) { + @SuppressWarnings("unchecked") + List> list = this.objectMapper.readValue(payload, + List.class); + for (Map map : list) { + sendMap(map); + } + } + else { + @SuppressWarnings("unchecked") + Map map = this.objectMapper.readValue(payload, Map.class); + sendMap(map); + } } catch (IOException ex) { log.error("Error receiving hystrix stream payload: " + payload, ex); } } + private void sendMap(Map map) { + Map data = getPayloadData(map); + if (log.isDebugEnabled()) { + log.debug("Received hystrix stream payload: " + data); + } + this.subject.onNext(data); + } + public static Map getPayloadData(Map jsonMap) { @SuppressWarnings("unchecked") Map origin = (Map) jsonMap.get("origin"); diff --git a/spring-cloud-netflix-turbine-stream/src/test/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregatorTests.java b/spring-cloud-netflix-turbine-stream/src/test/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregatorTests.java index 998bb1ca..5d9352c3 100644 --- a/spring-cloud-netflix-turbine-stream/src/test/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregatorTests.java +++ b/spring-cloud-netflix-turbine-stream/src/test/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregatorTests.java @@ -16,18 +16,19 @@ package org.springframework.cloud.netflix.turbine.stream; -import static org.hamcrest.CoreMatchers.containsString; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.CoreMatchers.not; -import static org.junit.Assert.assertThat; - import java.util.Map; +import com.fasterxml.jackson.databind.ObjectMapper; + import org.junit.Rule; import org.junit.Test; + import org.springframework.boot.test.rule.OutputCapture; -import com.fasterxml.jackson.databind.ObjectMapper; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.junit.Assert.assertThat; import rx.subjects.PublishSubject; @@ -52,6 +53,15 @@ public class HystrixStreamAggregatorTests { this.output.expect(not(containsString("ERROR"))); } + @Test + public void messageWrappedInArray() throws Exception { + this.publisher.subscribe(map -> { + assertThat(map.get("type"), equalTo("HystrixCommand")); + }); + this.aggregator.sendToSubject("[" + PAYLOAD + "]"); + this.output.expect(not(containsString("ERROR"))); + } + @Test public void doubleEncodedMessage() throws Exception { this.publisher.subscribe(map -> {