diff --git a/spring-cloud-netflix-hystrix-amqp/pom.xml b/spring-cloud-netflix-hystrix-amqp/pom.xml index 531c7911..4530c8c4 100644 --- a/spring-cloud-netflix-hystrix-amqp/pom.xml +++ b/spring-cloud-netflix-hystrix-amqp/pom.xml @@ -47,6 +47,10 @@ spring-integration-java-dsl true + + com.fasterxml.jackson.core + jackson-databind + com.netflix.hystrix hystrix-core diff --git a/spring-cloud-netflix-hystrix-amqp/src/main/java/org/springframework/netflix/hystrix/amqp/HystrixStreamAutoConfiguration.java b/spring-cloud-netflix-hystrix-amqp/src/main/java/org/springframework/netflix/hystrix/amqp/HystrixStreamAutoConfiguration.java index 304c246d..9d2cb054 100644 --- a/spring-cloud-netflix-hystrix-amqp/src/main/java/org/springframework/netflix/hystrix/amqp/HystrixStreamAutoConfiguration.java +++ b/spring-cloud-netflix-hystrix-amqp/src/main/java/org/springframework/netflix/hystrix/amqp/HystrixStreamAutoConfiguration.java @@ -1,8 +1,10 @@ package org.springframework.netflix.hystrix.amqp; -import com.netflix.hystrix.HystrixCircuitBreaker; -import org.springframework.amqp.core.AmqpTemplate; +import javax.annotation.PostConstruct; + import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; @@ -16,50 +18,68 @@ import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.amqp.Amqp; import org.springframework.scheduling.annotation.EnableScheduling; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.netflix.hystrix.HystrixCircuitBreaker; + /** * @author Spencer Gibb */ @Configuration -@ConditionalOnClass({HystrixCircuitBreaker.class, AmqpTemplate.class}) +@ConditionalOnClass({ HystrixCircuitBreaker.class, RabbitTemplate.class }) +@ConditionalOnExpression("${hystrix.stream.amqp.enabled:true}") +@IntegrationComponentScan(basePackageClasses = HystrixStreamChannel.class) +@EnableScheduling public class HystrixStreamAutoConfiguration { - @Configuration - @ConditionalOnExpression("${hystrix.stream.amqp.enabled:true}") - @IntegrationComponentScan(basePackageClasses = HystrixStreamChannel.class) - @EnableScheduling - protected static class HystrixStreamAmqpAutoConfiguration { - @Autowired - private AmqpTemplate amqpTemplate; + @Autowired + private RabbitTemplate amqpTemplate; + + @Autowired(required = false) + private ObjectMapper objectMapper; + + @PostConstruct + public void init() { + Jackson2JsonMessageConverter converter = messageConverter(); + amqpTemplate.setMessageConverter(converter); + } + + @Bean + public HystrixStreamTask hystrixStreamTask() { + return new HystrixStreamTask(); + } + + @Bean + public DirectChannel hystrixStream() { + return new DirectChannel(); + } - @Bean - public HystrixStreamTask hystrixStreamTask() { - return new HystrixStreamTask(); - } + @Bean + public DirectExchange hystrixStreamExchange() { + DirectExchange exchange = new DirectExchange(Constants.HYSTRIX_STREAM_NAME); + return exchange; + } - @Bean - public DirectChannel hystrixStream() { - return new DirectChannel(); - } - - @Bean - public DirectExchange hystrixStreamExchange() { - DirectExchange exchange = new DirectExchange(Constants.HYSTRIX_STREAM_NAME); - return exchange; - } + @Bean + public IntegrationFlow hystrixStreamOutboundFlow() { + return IntegrationFlows + .from("hystrixStream") + // TODO: set content type + /* + * .enrichHeaders(new ComponentConfigurer() { + * + * @Override public void configure(HeaderEnricherSpec spec) { + * spec.header("content-type", "application/json", true); } }) + */ + .handle(Amqp.outboundAdapter(this.amqpTemplate).exchangeName( + Constants.HYSTRIX_STREAM_NAME)).get(); + } - @Bean - public IntegrationFlow hystrixStreamOutboundFlow() { - return IntegrationFlows.from("hystrixStream") - //TODO: set content type - /*.enrichHeaders(new ComponentConfigurer() { - @Override - public void configure(HeaderEnricherSpec spec) { - spec.header("content-type", "application/json", true); - } - })*/ - .handle(Amqp.outboundAdapter(this.amqpTemplate).exchangeName(Constants.HYSTRIX_STREAM_NAME)) - .get(); - } - } + private Jackson2JsonMessageConverter messageConverter() { + Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); + if (objectMapper != null) { + converter.setJsonObjectMapper(objectMapper); + } + return converter; + } } diff --git a/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/netflix/turbine/amqp/Aggregator.java b/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/Aggregator.java similarity index 97% rename from spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/netflix/turbine/amqp/Aggregator.java rename to spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/Aggregator.java index 505961d2..b71b1d4e 100644 --- a/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/netflix/turbine/amqp/Aggregator.java +++ b/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/Aggregator.java @@ -1,4 +1,4 @@ -package org.springframework.netflix.turbine.amqp; +package org.springframework.cloud.netflix.turbine.amqp; import com.fasterxml.jackson.databind.ObjectMapper; diff --git a/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/netflix/turbine/amqp/EnableTurbineAmqp.java b/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/EnableTurbineAmqp.java similarity index 86% rename from spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/netflix/turbine/amqp/EnableTurbineAmqp.java rename to spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/EnableTurbineAmqp.java index d05ffa1c..d9af015c 100644 --- a/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/netflix/turbine/amqp/EnableTurbineAmqp.java +++ b/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/EnableTurbineAmqp.java @@ -1,4 +1,4 @@ -package org.springframework.netflix.turbine.amqp; +package org.springframework.cloud.netflix.turbine.amqp; import org.springframework.context.annotation.Import; diff --git a/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/TurbineAmqpAutoConfiguration.java b/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/TurbineAmqpAutoConfiguration.java new file mode 100644 index 00000000..c83b8dae --- /dev/null +++ b/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/TurbineAmqpAutoConfiguration.java @@ -0,0 +1,92 @@ +package org.springframework.cloud.netflix.turbine.amqp; + +import java.util.HashMap; +import java.util.Map; + +import javax.annotation.PostConstruct; + +import org.springframework.amqp.core.AmqpTemplate; +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; +import org.springframework.cloud.netflix.Constants; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.IntegrationFlows; +import org.springframework.integration.dsl.amqp.Amqp; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * @author Spencer Gibb + */ +@Configuration +@ConditionalOnClass(AmqpTemplate.class) +@ConditionalOnExpression("${turbine.amqp.enabled:true}") +public class TurbineAmqpAutoConfiguration { + + @Autowired + private ConnectionFactory connectionFactory; + + @Autowired + private RabbitTemplate amqpTemplate; + + @Autowired(required = false) + private ObjectMapper objectMapper; + + @PostConstruct + public void init() { + Jackson2JsonMessageConverter converter = messageConverter(); + amqpTemplate.setMessageConverter(converter); + } + + @Bean + public DirectExchange hystrixStreamExchange() { + DirectExchange exchange = new DirectExchange(Constants.HYSTRIX_STREAM_NAME); + return exchange; + } + + @Bean + protected Binding localTurbineAmqpQueueBinding() { + return BindingBuilder.bind(hystrixStreamQueue()).to(hystrixStreamExchange()) + .with(""); + } + + @Bean + public Queue hystrixStreamQueue() { + Map args = new HashMap<>(); + args.put("x-message-ttl", 60000); // TODO: configure TTL + Queue queue = new Queue(Constants.HYSTRIX_STREAM_NAME, false, false, false, args); + return queue; + } + + @Bean + public IntegrationFlow hystrixStreamAggregatorInboundFlow() { + return IntegrationFlows + .from(Amqp.inboundAdapter(connectionFactory, hystrixStreamQueue()) + .messageConverter(messageConverter())) + .channel("hystrixStreamAggregator").get(); + } + + @Bean + public Aggregator hystrixStreamAggregator() { + return new Aggregator(); + } + + private Jackson2JsonMessageConverter messageConverter() { + Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); + if (objectMapper != null) { + converter.setJsonObjectMapper(objectMapper); + } + return converter; + } + +} diff --git a/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/netflix/turbine/amqp/TurbineAmqpConfiguration.java b/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/TurbineAmqpConfiguration.java similarity index 60% rename from spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/netflix/turbine/amqp/TurbineAmqpConfiguration.java rename to spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/TurbineAmqpConfiguration.java index b9b1a777..38514c0c 100644 --- a/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/netflix/turbine/amqp/TurbineAmqpConfiguration.java +++ b/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/TurbineAmqpConfiguration.java @@ -1,35 +1,40 @@ -package org.springframework.netflix.turbine.amqp; +package org.springframework.cloud.netflix.turbine.amqp; -import com.netflix.turbine.aggregator.InstanceKey; -import com.netflix.turbine.aggregator.StreamAggregator; -import com.netflix.turbine.internal.JsonUtility; +import static io.reactivex.netty.pipeline.PipelineConfigurators.sseServerConfigurator; import io.netty.buffer.ByteBuf; import io.reactivex.netty.RxNetty; import io.reactivex.netty.protocol.http.server.HttpServer; import io.reactivex.netty.protocol.text.sse.ServerSentEvent; + +import java.util.Map; + import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.context.properties.ConfigurationProperties; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.SmartLifecycle; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; + import rx.Observable; import rx.subjects.PublishSubject; -import java.util.Map; - -import static io.reactivex.netty.pipeline.PipelineConfigurators.sseServerConfigurator; +import com.netflix.turbine.aggregator.InstanceKey; +import com.netflix.turbine.aggregator.StreamAggregator; +import com.netflix.turbine.internal.JsonUtility; /** * @author Spencer Gibb */ @Configuration @Slf4j -@ConfigurationProperties("turbine.amqp") +@EnableConfigurationProperties(TurbineAmqpProperties.class) public class TurbineAmqpConfiguration implements SmartLifecycle { private boolean running = false; - private int port = 8989; + @Autowired + private TurbineAmqpProperties turbine; @Bean public PublishSubject> hystrixSubject() { @@ -39,19 +44,26 @@ public class TurbineAmqpConfiguration implements SmartLifecycle { @Bean public HttpServer aggregatorServer() { // multicast so multiple concurrent subscribers get the same stream - Observable> publishedStreams = StreamAggregator.aggregateGroupedStreams(hystrixSubject() - .groupBy(data -> InstanceKey.create((String) data.get("instanceId")))) - .doOnUnsubscribe(() -> log.info("AmqpTurbine => Unsubscribing aggregation.")) - .doOnSubscribe(() -> log.info("AmqpTurbine => Starting aggregation")) - .flatMap(o -> o).publish().refCount(); - - HttpServer httpServer = RxNetty.createHttpServer(port, (request, response) -> { - log.info("AmqpTurbine => SSE Request Received"); - response.getHeaders().setHeader("Content-Type", "text/event-stream"); - return publishedStreams - .doOnUnsubscribe(() -> log.info("AmqpTurbine => Unsubscribing RxNetty server connection")) - .flatMap(data -> response.writeAndFlush(new ServerSentEvent(null, null, JsonUtility.mapToJson(data)))); - }, sseServerConfigurator()); + Observable> publishedStreams = StreamAggregator + .aggregateGroupedStreams( + hystrixSubject().groupBy( + data -> InstanceKey.create((String) data + .get("instanceId")))) + .doOnUnsubscribe(() -> log.info("Unsubscribing aggregation.")) + .doOnSubscribe(() -> log.info("Starting aggregation")).flatMap(o -> o) + .publish().refCount(); + + HttpServer httpServer = RxNetty.createHttpServer( + turbine.getPort(), + (request, response) -> { + log.info("SSE Request Received"); + response.getHeaders().setHeader("Content-Type", "text/event-stream"); + return publishedStreams.doOnUnsubscribe( + () -> log.info("Unsubscribing RxNetty server connection")) + .flatMap( + data -> response.writeAndFlush(new ServerSentEvent( + null, null, JsonUtility.mapToJson(data)))); + }, sseServerConfigurator()); return httpServer; } @@ -75,7 +87,8 @@ public class TurbineAmqpConfiguration implements SmartLifecycle { public void stop() { try { aggregatorServer().shutdown(); - } catch (InterruptedException e) { + } + catch (InterruptedException e) { log.error("Error shutting down", e); } running = false; diff --git a/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/TurbineAmqpProperties.java b/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/TurbineAmqpProperties.java new file mode 100644 index 00000000..caa0ba78 --- /dev/null +++ b/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/cloud/netflix/turbine/amqp/TurbineAmqpProperties.java @@ -0,0 +1,16 @@ +package org.springframework.cloud.netflix.turbine.amqp; + +import lombok.Data; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * @author Dave Syer + */ +@ConfigurationProperties("turbine.amqp") +@Data +public class TurbineAmqpProperties { + + private int port = 8989; + +} diff --git a/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/netflix/turbine/amqp/TurbineAmqpAutoConfiguration.java b/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/netflix/turbine/amqp/TurbineAmqpAutoConfiguration.java deleted file mode 100644 index eb33a35d..00000000 --- a/spring-cloud-netflix-turbine-amqp/src/main/java/org/springframework/netflix/turbine/amqp/TurbineAmqpAutoConfiguration.java +++ /dev/null @@ -1,70 +0,0 @@ -package org.springframework.netflix.turbine.amqp; - -import java.util.HashMap; -import java.util.Map; - -import org.springframework.amqp.core.AmqpTemplate; -import org.springframework.amqp.core.Binding; -import org.springframework.amqp.core.BindingBuilder; -import org.springframework.amqp.core.DirectExchange; -import org.springframework.amqp.core.Queue; -import org.springframework.amqp.rabbit.connection.ConnectionFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; -import org.springframework.cloud.netflix.Constants; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.integration.dsl.IntegrationFlow; -import org.springframework.integration.dsl.IntegrationFlows; -import org.springframework.integration.dsl.amqp.Amqp; - -/** - * @author Spencer Gibb - */ -@Configuration -@ConditionalOnClass(AmqpTemplate.class) -public class TurbineAmqpAutoConfiguration { - - @Configuration - @ConditionalOnExpression("${turbine.amqp.enabled:true}") - protected static class HystrixStreamAggregatorAutoConfiguration { - - @Autowired - private ConnectionFactory connectionFactory; - - //TODO: how to fail gracefully if no rabbit? - @Bean - public DirectExchange hystrixStreamExchange() { - DirectExchange exchange = new DirectExchange(Constants.HYSTRIX_STREAM_NAME); - return exchange; - } - - @Bean - protected Binding localTurbineAmqpQueueBinding() { - return BindingBuilder.bind(hystrixStreamQueue()).to(hystrixStreamExchange()).with(""); - } - - @Bean - public Queue hystrixStreamQueue() { - Map args = new HashMap<>(); - args.put("x-message-ttl", 60000); //TODO: configure TTL - Queue queue = new Queue(Constants.HYSTRIX_STREAM_NAME, false, false, false, args); - return queue; - } - - @Bean - public IntegrationFlow hystrixStreamAggregatorInboundFlow() { - return IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, hystrixStreamQueue())) - .channel("hystrixStreamAggregator") - .get(); - } - - @Bean - public Aggregator hystrixStreamAggregator() { - return new Aggregator(); - } - } - - -} diff --git a/spring-cloud-netflix-turbine-amqp/src/main/resources/META-INF/spring.factories b/spring-cloud-netflix-turbine-amqp/src/main/resources/META-INF/spring.factories index 612b8fb5..ee13e692 100644 --- a/spring-cloud-netflix-turbine-amqp/src/main/resources/META-INF/spring.factories +++ b/spring-cloud-netflix-turbine-amqp/src/main/resources/META-INF/spring.factories @@ -1,2 +1,2 @@ org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ -org.springframework.netflix.turbine.amqp.TurbineAmqpAutoConfiguration +org.springframework.cloud.netflix.turbine.amqp.TurbineAmqpAutoConfiguration diff --git a/spring-cloud-netflix-turbine-amqp/src/test/java/org/springframework/netflix/turbine/amqp/AggregatorTest.java b/spring-cloud-netflix-turbine-amqp/src/test/java/org/springframework/netflix/turbine/amqp/AggregatorTest.java index 3e6f05b6..09269330 100644 --- a/spring-cloud-netflix-turbine-amqp/src/test/java/org/springframework/netflix/turbine/amqp/AggregatorTest.java +++ b/spring-cloud-netflix-turbine-amqp/src/test/java/org/springframework/netflix/turbine/amqp/AggregatorTest.java @@ -12,8 +12,7 @@ import com.netflix.turbine.internal.JsonUtility; import rx.Observable; import rx.observables.GroupedObservable; - -import static org.springframework.netflix.turbine.amqp.Aggregator.getPayloadData; +import static org.springframework.cloud.netflix.turbine.amqp.Aggregator.getPayloadData; public class AggregatorTest { diff --git a/spring-cloud-netflix-turbine-amqp/src/test/java/org/springframework/netflix/turbine/amqp/SampleTurbineAmqpApplication.java b/spring-cloud-netflix-turbine-amqp/src/test/java/org/springframework/netflix/turbine/amqp/SampleTurbineAmqpApplication.java index e87353d6..3fe86183 100644 --- a/spring-cloud-netflix-turbine-amqp/src/test/java/org/springframework/netflix/turbine/amqp/SampleTurbineAmqpApplication.java +++ b/spring-cloud-netflix-turbine-amqp/src/test/java/org/springframework/netflix/turbine/amqp/SampleTurbineAmqpApplication.java @@ -2,6 +2,7 @@ package org.springframework.netflix.turbine.amqp; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.cloud.netflix.turbine.amqp.EnableTurbineAmqp; /** * @author Spencer Gibb