@ -18,6 +18,9 @@ package org.springframework.messaging.tcp.reactor;
@@ -18,6 +18,9 @@ package org.springframework.messaging.tcp.reactor;
import java.lang.reflect.Modifier ;
import java.net.InetSocketAddress ;
import java.util.Arrays ;
import java.util.Collections ;
import java.util.Properties ;
import org.apache.commons.logging.Log ;
import org.apache.commons.logging.LogFactory ;
@ -34,6 +37,9 @@ import reactor.core.composable.Deferred;
@@ -34,6 +37,9 @@ import reactor.core.composable.Deferred;
import reactor.core.composable.Promise ;
import reactor.core.composable.Stream ;
import reactor.core.composable.spec.Promises ;
import reactor.core.configuration.ConfigurationReader ;
import reactor.core.configuration.DispatcherConfiguration ;
import reactor.core.configuration.ReactorConfiguration ;
import reactor.function.Consumer ;
import reactor.function.support.SingleUseConsumer ;
import reactor.io.Buffer ;
@ -81,13 +87,12 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
@@ -81,13 +87,12 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
public ReactorTcpClient ( String host , int port , Codec < Buffer , Message < P > , Message < P > > codec ) {
// Revisit in 1.1: is Environment still required w/ sync dispatcher?
this . environment = new Environment ( ) ;
this . environment = new Environment ( new SynchronousDispatcherConfigReader ( ) ) ;
this . tcpClient = new TcpClientSpec < Message < P > , Message < P > > ( REACTOR_TCP_CLIENT_TYPE )
. env ( this . environment )
. codec ( codec )
. connect ( host , port )
. synchronousDispatcher ( )
. get ( ) ;
checkReactorVersion ( ) ;
@ -228,4 +233,20 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
@@ -228,4 +233,20 @@ public class ReactorTcpClient<P> implements TcpOperations<P> {
}
}
/ * *
* A ConfigurationReader that enforces the use of a SynchronousDispatcher .
*
* < p > The { @link reactor . core . configuration . PropertiesConfigurationReader } used by
* default automatically creates other dispatchers with thread pools that are
* not needed .
* /
private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
@Override
public ReactorConfiguration read ( ) {
return new ReactorConfiguration ( Arrays . < DispatcherConfiguration > asList ( ) , "sync" , new Properties ( ) ) ;
}
}
}