From a372b683cde47651fcec6ef0cec9004d79c4975e Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 17 Mar 2014 21:11:13 -0400 Subject: [PATCH] Use custom config read in ReactorTcpClient Use a custom ConfigReader to enforce the use of SynchronousDispatcher and no other dispatchers otherwise created by default. Avoids the creation thread pools never to be used. --- .../stomp/StompBrokerRelayMessageHandler.java | 1 + .../tcp/reactor/ReactorTcpClient.java | 25 +++++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index b0cfa55c06..41736a241e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -16,6 +16,7 @@ package org.springframework.messaging.simp.stomp; +import java.io.IOException; import java.util.Collection; import java.util.Map; import java.util.concurrent.Callable; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java index 61c5d49f83..c58bbffe3d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorTcpClient.java @@ -18,6 +18,9 @@ package org.springframework.messaging.tcp.reactor; import java.lang.reflect.Modifier; import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Collections; +import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +37,9 @@ import reactor.core.composable.Deferred; import reactor.core.composable.Promise; import reactor.core.composable.Stream; import reactor.core.composable.spec.Promises; +import reactor.core.configuration.ConfigurationReader; +import reactor.core.configuration.DispatcherConfiguration; +import reactor.core.configuration.ReactorConfiguration; import reactor.function.Consumer; import reactor.function.support.SingleUseConsumer; import reactor.io.Buffer; @@ -81,13 +87,12 @@ public class ReactorTcpClient

implements TcpOperations

{ public ReactorTcpClient(String host, int port, Codec, Message

> codec) { // Revisit in 1.1: is Environment still required w/ sync dispatcher? - this.environment = new Environment(); + this.environment = new Environment(new SynchronousDispatcherConfigReader()); this.tcpClient = new TcpClientSpec, Message

>(REACTOR_TCP_CLIENT_TYPE) .env(this.environment) .codec(codec) .connect(host, port) - .synchronousDispatcher() .get(); checkReactorVersion(); @@ -228,4 +233,20 @@ public class ReactorTcpClient

implements TcpOperations

{ } } + + /** + * A ConfigurationReader that enforces the use of a SynchronousDispatcher. + * + *

The {@link reactor.core.configuration.PropertiesConfigurationReader} used by + * default automatically creates other dispatchers with thread pools that are + * not needed. + */ + private static class SynchronousDispatcherConfigReader implements ConfigurationReader { + + @Override + public ReactorConfiguration read() { + return new ReactorConfiguration(Arrays.asList(), "sync", new Properties()); + } + } + } \ No newline at end of file