@ -16,12 +16,12 @@
@@ -16,12 +16,12 @@
package org.springframework.messaging.tcp.reactor ;
import java.util.Collection ;
import java.util.function.BiFunction ;
import java.util.function.Consumer ;
import java.util.function.Function ;
import io.netty.channel.group.ChannelGroup ;
import io.netty.channel.group.ChannelGroupFuture ;
import io.netty.channel.group.DefaultChannelGroup ;
import io.netty.util.concurrent.ImmediateEventExecutor ;
import org.reactivestreams.Publisher ;
@ -39,20 +39,16 @@ import reactor.ipc.netty.options.ClientOptions;
@@ -39,20 +39,16 @@ import reactor.ipc.netty.options.ClientOptions;
import reactor.ipc.netty.tcp.TcpClient ;
import reactor.util.concurrent.QueueSupplier ;
import org.springframework.messaging.Message ;
import org.springframework.messaging.tcp.ReconnectStrategy ;
import org.springframework.messaging.tcp.TcpConnection ;
import org.springframework.messaging.tcp.TcpConnectionHandler ;
import org.springframework.messaging.tcp.TcpOperations ;
import org.springframework.util.Assert ;
import org.springframework.util.concurrent.ListenableFuture ;
import org.springframework.util.concurrent.SettableListenableFuture ;
/ * *
* An implementation of { @link org . springframework . messaging . tcp . TcpOperations }
* based on the TCP client support of the Reactor project .
*
* < p > This implementation wraps N ( Reactor ) clients for N { @link # connect } calls ,
* i . e . a separate ( Reactor ) client instance for each connection .
* Reactor Netty based implementation of { @link TcpOperations } .
*
* @author Rossen Stoyanchev
* @author Stephane Maldini
@ -64,55 +60,42 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@@ -64,55 +60,42 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
private final ReactorNettyCodec < P > codec ;
private final Scheduler scheduler = Schedulers . newParallel ( "ReactorNettyTcpClient" ) ;
private final ChannelGroup channelGroup ;
private final ChannelGroup group ;
private final Scheduler scheduler = Schedulers . newParallel ( "ReactorNettyTcpClient" ) ;
private volatile boolean stopping ;
private volatile boolean stopping = false ;
/ * *
* A constructor that creates a { @link TcpClient TcpClient } factory relying on
* Reactor Netty TCP threads . The number of Netty threads can be tweaked with
* the { @code reactor . tcp . ioThreadCount } System property . The network I / O
* threads will be shared amongst the active clients .
* < p > Also see the constructor accepting a { @link Consumer } of
* { @link ClientOptions } for additional options .
* @param host the host to connect to
* @param port the port to connect to
* @param codec for encoding and decoding messages
* Basic constructor with a host and a port .
* /
public ReactorNettyTcpClient ( String host , int port , ReactorNettyCodec < P > codec ) {
this ( opts - > opts . connect ( host , port ) , codec ) ;
}
/ * *
* A constructor with a configurator { @link Consumer } that will receive default
* { @link ClientOptions } from { @link TcpClient } . This might be used to add SSL
* or specific network parameters to the generated client configuration .
* @param tcpOptions callback for configuring shared { @link ClientOptions }
* @param codec for encoding and decoding messages
* Alternate constructor with a { @link ClientOptions } consumer providing
* additional control beyond a host and a port .
* /
public ReactorNettyTcpClient ( Consumer < ? super ClientOptions > tcpOptions , ReactorNettyCodec < P > codec ) {
Assert . notNull ( codec , "'codec' is required" ) ;
this . group = new DefaultChannelGroup ( ImmediateEventExecutor . INSTANCE ) ;
this . tcpClient = TcpClient . create ( opts - > tcpOptions . accept ( opts . channelGroup ( group ) ) ) ;
public ReactorNettyTcpClient ( Consumer < ClientOptions > consumer , ReactorNettyCodec < P > codec ) {
Assert . notNull ( consumer , "Consumer<ClientOptions> is required" ) ;
Assert . notNull ( codec , "ReactorNettyCodec is required" ) ;
this . channelGroup = new DefaultChannelGroup ( ImmediateEventExecutor . INSTANCE ) ;
this . tcpClient = TcpClient . create ( consumer . andThen ( opts - > opts . channelGroup ( this . channelGroup ) ) ) ;
this . codec = codec ;
}
@Override
public ListenableFuture < Void > connect ( final TcpConnectionHandler < P > handler ) {
Assert . notNull ( handler , "'handler' is required" ) ;
Assert . notNull ( handler , "TcpConnectionHandler is required" ) ;
if ( this . stopping ) {
IllegalStateException ex = new IllegalStateException ( "Shutting down." ) ;
handler . afterConnectFailure ( ex ) ;
return new MonoToListenableFutureAdapter < > ( Mono . < Void > error ( ex ) ) ;
return handleShuttingDownConnectFailure ( handler ) ;
}
Mono < Void > connectMono = this . tcpClient
. newHandler ( new MessageHandler < > ( handler , this . codec , this . schedu ler) )
. newHandler ( new ReactorNettyHandler ( hand ler) )
. doOnError ( handler : : afterConnectFailure )
. then ( ) ;
@ -121,99 +104,89 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@@ -121,99 +104,89 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
@Override
public ListenableFuture < Void > connect ( TcpConnectionHandler < P > handler , ReconnectStrategy strategy ) {
Assert . notNull ( handler , "'handler' is required" ) ;
Assert . notNull ( strategy , "'reconnectStrategy' is required" ) ;
Assert . notNull ( handler , "TcpConnectionHandler is required" ) ;
Assert . notNull ( strategy , "ReconnectStrategy is required" ) ;
if ( this . stopping ) {
IllegalStateException ex = new IllegalStateException ( "Shutting down." ) ;
handler . afterConnectFailure ( ex ) ;
return new MonoToListenableFutureAdapter < > ( Mono . < Void > error ( ex ) ) ;
return handleShuttingDownConnectFailure ( handler ) ;
}
MonoProcessor < Void > connectMono = MonoProcessor . create ( ) ;
this . tcpClient . newHandler ( new MessageHandler < > ( handler , this . codec , this . scheduler ) )
. doOnNext ( item - > {
if ( ! connectMono . isTerminated ( ) ) {
connectMono . onComplete ( ) ;
}
} )
. doOnError ( ex - > {
if ( ! connectMono . isTerminated ( ) ) {
connectMono . onError ( ex ) ;
}
} )
this . tcpClient
. newHandler ( new ReactorNettyHandler ( handler ) )
. doOnNext ( connectFailureConsumer ( connectMono ) )
. doOnError ( connectFailureConsumer ( connectMono ) )
. then ( NettyContext : : onClose )
. retryWhen ( new Reconnector < > ( strategy ) )
. repeatWhen ( new Reconnector < > ( strategy ) )
. retryWhen ( reconnectFunction ( strategy ) )
. repeatWhen ( reconnectFunction ( strategy ) )
. subscribe ( ) ;
return new MonoToListenableFutureAdapter < > ( connectMono ) ;
}
private ListenableFuture < Void > handleShuttingDownConnectFailure ( TcpConnectionHandler < P > handler ) {
IllegalStateException ex = new IllegalStateException ( "Shutting down." ) ;
handler . afterConnectFailure ( ex ) ;
return new MonoToListenableFutureAdapter < > ( Mono . error ( ex ) ) ;
}
private < T > Consumer < T > connectFailureConsumer ( MonoProcessor < Void > connectMono ) {
return o - > {
if ( ! connectMono . isTerminated ( ) ) {
if ( o instanceof Throwable ) {
connectMono . onError ( ( Throwable ) o ) ;
}
else {
connectMono . onComplete ( ) ;
}
}
} ;
}
private < T > Function < Flux < T > , Publisher < ? > > reconnectFunction ( ReconnectStrategy reconnectStrategy ) {
return flux - > flux . scan ( 1 , ( count , e ) - > count + + )
. flatMap ( attempt - > Mono . delayMillis ( reconnectStrategy . getTimeToNextAttempt ( attempt ) ) ) ;
}
@Override
public ListenableFuture < Void > shutdown ( ) {
if ( this . stopping ) {
return new MonoToListenableFutureAdapter < > ( Mono . empty ( ) ) ;
SettableListenableFuture < Void > future = new SettableListenableFuture < > ( ) ;
future . set ( null ) ;
return future ;
}
this . stopping = true ;
Mono < Void > completion = FutureMono . from ( this . group . close ( ) )
. doAfterTerminate ( ( x , e ) - > this . scheduler . dispose ( ) ) ;
ChannelGroupFuture future = this . channelGroup . close ( ) ;
Mono < Void > completion = FutureMono . from ( future ) . doAfterTerminate ( ( x , e ) - > scheduler . dispose ( ) ) ;
return new MonoToListenableFutureAdapter < > ( completion ) ;
}
private static final class MessageHandler < P > implements BiFunction < NettyInbound , NettyOutbound , Publisher < Void > > {
private class ReactorNettyHandler implements BiFunction < NettyInbound , NettyOutbound , Publisher < Void > > {
private final TcpConnectionHandler < P > connectionHandler ;
private final ReactorNettyCodec < P > codec ;
private final Scheduler scheduler ;
MessageHandler ( TcpConnectionHandler < P > handler , ReactorNettyCodec < P > codec , Scheduler scheduler ) {
ReactorNettyHandler ( TcpConnectionHandler < P > handler ) {
this . connectionHandler = handler ;
this . codec = codec ;
this . scheduler = scheduler ;
}
@Override
public Publisher < Void > apply ( NettyInbound in , NettyOutbound out ) {
Flux < Collection < Message < P > > > inbound = in . receive ( ) . map ( this . codec . getDecoder ( ) ) ;
DirectProcessor < Void > closeProcessor = DirectProcessor . create ( ) ;
TcpConnection < P > tcpConnection =
new ReactorNettyTcpConnection < > ( in , out , this . codec . getEncoder ( ) , closeProcessor ) ;
public Publisher < Void > apply ( NettyInbound inbound , NettyOutbound outbound ) {
this . scheduler . schedule ( ( ) - > connectionHandler . afterConnected ( tcpConnection ) ) ;
inbound = inbound . publishOn ( this . scheduler , QueueSupplier . SMALL_BUFFER_SIZE ) ;
DirectProcessor < Void > completion = DirectProcessor . create ( ) ;
TcpConnection < P > connection = new ReactorNettyTcpConnection < > ( inbound , outbound , codec , completion ) ;
scheduler . schedule ( ( ) - > connectionHandler . afterConnected ( connection ) ) ;
inbound . flatMapIterable ( Function . identity ( ) )
inbound . receive ( )
. map ( codec . getDecoder ( ) )
. publishOn ( scheduler , QueueSupplier . SMALL_BUFFER_SIZE )
. flatMapIterable ( Function . identity ( ) )
. subscribe (
connectionHandler : : handleMessage ,
connectionHandler : : handleFailure ,
connectionHandler : : afterConnectionClosed ) ;
return closeProcessor ;
}
}
private static final class Reconnector < T > implements Function < Flux < T > , Publisher < ? > > {
private final ReconnectStrategy strategy ;
Reconnector ( ReconnectStrategy strategy ) {
this . strategy = strategy ;
}
@Override
public Publisher < ? > apply ( Flux < T > flux ) {
return flux . scan ( 1 , ( p , e ) - > p + + )
. flatMap ( attempt - > Mono . delayMillis ( strategy . getTimeToNextAttempt ( attempt ) ) ) ;
return completion ;
}
}