Browse Source

Merge branch 'master' into 2.0.x

pull/6/head
Dave Syer 8 years ago
parent
commit
63b2916993
  1. 24
      spring-cloud-netflix-turbine-stream/src/main/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregator.java
  2. 22
      spring-cloud-netflix-turbine-stream/src/test/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregatorTests.java

24
spring-cloud-netflix-turbine-stream/src/main/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregator.java

@ -17,6 +17,7 @@
package org.springframework.cloud.netflix.turbine.stream; package org.springframework.cloud.netflix.turbine.stream;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -56,18 +57,33 @@ public class HystrixStreamAggregator {
payload = payload.replace("\\\"", "\""); payload = payload.replace("\\\"", "\"");
} }
try { try {
if (payload.startsWith("[")) {
@SuppressWarnings("unchecked")
List<Map<String, Object>> list = this.objectMapper.readValue(payload,
List.class);
for (Map<String, Object> map : list) {
sendMap(map);
}
}
else {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String, Object> map = this.objectMapper.readValue(payload, Map.class); Map<String, Object> map = this.objectMapper.readValue(payload, Map.class);
Map<String, Object> data = getPayloadData(map); sendMap(map);
}
log.debug("Received hystrix stream payload: " + data);
this.subject.onNext(data);
} }
catch (IOException ex) { catch (IOException ex) {
log.error("Error receiving hystrix stream payload: " + payload, ex); log.error("Error receiving hystrix stream payload: " + payload, ex);
} }
} }
private void sendMap(Map<String, Object> map) {
Map<String, Object> data = getPayloadData(map);
if (log.isDebugEnabled()) {
log.debug("Received hystrix stream payload: " + data);
}
this.subject.onNext(data);
}
public static Map<String, Object> getPayloadData(Map<String, Object> jsonMap) { public static Map<String, Object> getPayloadData(Map<String, Object> jsonMap) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String, Object> origin = (Map<String, Object>) jsonMap.get("origin"); Map<String, Object> origin = (Map<String, Object>) jsonMap.get("origin");

22
spring-cloud-netflix-turbine-stream/src/test/java/org/springframework/cloud/netflix/turbine/stream/HystrixStreamAggregatorTests.java

@ -16,18 +16,19 @@
package org.springframework.cloud.netflix.turbine.stream; package org.springframework.cloud.netflix.turbine.stream;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import java.util.Map; import java.util.Map;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.springframework.boot.test.rule.OutputCapture; import org.springframework.boot.test.rule.OutputCapture;
import com.fasterxml.jackson.databind.ObjectMapper; import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import rx.subjects.PublishSubject; import rx.subjects.PublishSubject;
@ -52,6 +53,15 @@ public class HystrixStreamAggregatorTests {
this.output.expect(not(containsString("ERROR"))); this.output.expect(not(containsString("ERROR")));
} }
@Test
public void messageWrappedInArray() throws Exception {
this.publisher.subscribe(map -> {
assertThat(map.get("type"), equalTo("HystrixCommand"));
});
this.aggregator.sendToSubject("[" + PAYLOAD + "]");
this.output.expect(not(containsString("ERROR")));
}
@Test @Test
public void doubleEncodedMessage() throws Exception { public void doubleEncodedMessage() throws Exception {
this.publisher.subscribe(map -> { this.publisher.subscribe(map -> {

Loading…
Cancel
Save