From 2a3e01399a1fbe4a5374a46de501de1b70aaceb0 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 28 Dec 2016 11:02:50 -0500 Subject: [PATCH] Polish ReactorNettyTcpClient --- .../tcp/reactor/ReactorNettyTcpClient.java | 159 ++++++++---------- .../reactor/ReactorNettyTcpConnection.java | 25 +-- 2 files changed, 75 insertions(+), 109 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index d9549ee933..3617ddb2b2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -16,12 +16,12 @@ package org.springframework.messaging.tcp.reactor; -import java.util.Collection; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.ImmediateEventExecutor; import org.reactivestreams.Publisher; @@ -39,20 +39,16 @@ import reactor.ipc.netty.options.ClientOptions; import reactor.ipc.netty.tcp.TcpClient; import reactor.util.concurrent.QueueSupplier; -import org.springframework.messaging.Message; import org.springframework.messaging.tcp.ReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.SettableListenableFuture; /** - * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} - * based on the TCP client support of the Reactor project. - * - *

This implementation wraps N (Reactor) clients for N {@link #connect} calls, - * i.e. a separate (Reactor) client instance for each connection. + * Reactor Netty based implementation of {@link TcpOperations}. * * @author Rossen Stoyanchev * @author Stephane Maldini @@ -64,55 +60,42 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ private final ReactorNettyCodec

codec; - private final Scheduler scheduler = Schedulers.newParallel("ReactorNettyTcpClient"); + private final ChannelGroup channelGroup; - private final ChannelGroup group; + private final Scheduler scheduler = Schedulers.newParallel("ReactorNettyTcpClient"); - private volatile boolean stopping; + private volatile boolean stopping = false; /** - * A constructor that creates a {@link TcpClient TcpClient} factory relying on - * Reactor Netty TCP threads. The number of Netty threads can be tweaked with - * the {@code reactor.tcp.ioThreadCount} System property. The network I/O - * threads will be shared amongst the active clients. - *

Also see the constructor accepting a {@link Consumer} of - * {@link ClientOptions} for additional options. - * @param host the host to connect to - * @param port the port to connect to - * @param codec for encoding and decoding messages + * Basic constructor with a host and a port. */ public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec

codec) { this(opts -> opts.connect(host, port), codec); } /** - * A constructor with a configurator {@link Consumer} that will receive default - * {@link ClientOptions} from {@link TcpClient}. This might be used to add SSL - * or specific network parameters to the generated client configuration. - * @param tcpOptions callback for configuring shared {@link ClientOptions} - * @param codec for encoding and decoding messages + * Alternate constructor with a {@link ClientOptions} consumer providing + * additional control beyond a host and a port. */ - public ReactorNettyTcpClient(Consumer tcpOptions, ReactorNettyCodec

codec) { - Assert.notNull(codec, "'codec' is required"); - this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); - this.tcpClient = TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group))); + public ReactorNettyTcpClient(Consumer consumer, ReactorNettyCodec

codec) { + Assert.notNull(consumer, "Consumer is required"); + Assert.notNull(codec, "ReactorNettyCodec is required"); + this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); + this.tcpClient = TcpClient.create(consumer.andThen(opts -> opts.channelGroup(this.channelGroup))); this.codec = codec; } @Override public ListenableFuture connect(final TcpConnectionHandler

handler) { - Assert.notNull(handler, "'handler' is required"); - + Assert.notNull(handler, "TcpConnectionHandler is required"); if (this.stopping) { - IllegalStateException ex = new IllegalStateException("Shutting down."); - handler.afterConnectFailure(ex); - return new MonoToListenableFutureAdapter<>(Mono.error(ex)); + return handleShuttingDownConnectFailure(handler); } Mono connectMono = this.tcpClient - .newHandler(new MessageHandler<>(handler, this.codec, this.scheduler)) + .newHandler(new ReactorNettyHandler(handler)) .doOnError(handler::afterConnectFailure) .then(); @@ -121,99 +104,89 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ @Override public ListenableFuture connect(TcpConnectionHandler

handler, ReconnectStrategy strategy) { - Assert.notNull(handler, "'handler' is required"); - Assert.notNull(strategy, "'reconnectStrategy' is required"); - + Assert.notNull(handler, "TcpConnectionHandler is required"); + Assert.notNull(strategy, "ReconnectStrategy is required"); if (this.stopping) { - IllegalStateException ex = new IllegalStateException("Shutting down."); - handler.afterConnectFailure(ex); - return new MonoToListenableFutureAdapter<>(Mono.error(ex)); + return handleShuttingDownConnectFailure(handler); } MonoProcessor connectMono = MonoProcessor.create(); - - this.tcpClient.newHandler(new MessageHandler<>(handler, this.codec, this.scheduler)) - .doOnNext(item -> { - if (!connectMono.isTerminated()) { - connectMono.onComplete(); - } - }) - .doOnError(ex -> { - if (!connectMono.isTerminated()) { - connectMono.onError(ex); - } - }) + this.tcpClient + .newHandler(new ReactorNettyHandler(handler)) + .doOnNext(connectFailureConsumer(connectMono)) + .doOnError(connectFailureConsumer(connectMono)) .then(NettyContext::onClose) - .retryWhen(new Reconnector<>(strategy)) - .repeatWhen(new Reconnector<>(strategy)) + .retryWhen(reconnectFunction(strategy)) + .repeatWhen(reconnectFunction(strategy)) .subscribe(); return new MonoToListenableFutureAdapter<>(connectMono); } + private ListenableFuture handleShuttingDownConnectFailure(TcpConnectionHandler

handler) { + IllegalStateException ex = new IllegalStateException("Shutting down."); + handler.afterConnectFailure(ex); + return new MonoToListenableFutureAdapter<>(Mono.error(ex)); + } + + private Consumer connectFailureConsumer(MonoProcessor connectMono) { + return o -> { + if (!connectMono.isTerminated()) { + if (o instanceof Throwable) { + connectMono.onError((Throwable) o); + } + else { + connectMono.onComplete(); + } + } + }; + } + + private Function, Publisher> reconnectFunction(ReconnectStrategy reconnectStrategy) { + return flux -> flux.scan(1, (count, e) -> count++) + .flatMap(attempt -> Mono.delayMillis(reconnectStrategy.getTimeToNextAttempt(attempt))); + } + @Override public ListenableFuture shutdown() { if (this.stopping) { - return new MonoToListenableFutureAdapter<>(Mono.empty()); + SettableListenableFuture future = new SettableListenableFuture<>(); + future.set(null); + return future; } - this.stopping = true; - - Mono completion = FutureMono.from(this.group.close()) - .doAfterTerminate((x, e) -> this.scheduler.dispose()); - + ChannelGroupFuture future = this.channelGroup.close(); + Mono completion = FutureMono.from(future).doAfterTerminate((x, e) -> scheduler.dispose()); return new MonoToListenableFutureAdapter<>(completion); } - private static final class MessageHandler

implements BiFunction> { + private class ReactorNettyHandler implements BiFunction> { private final TcpConnectionHandler

connectionHandler; - private final ReactorNettyCodec

codec; - private final Scheduler scheduler; - - MessageHandler(TcpConnectionHandler

handler, ReactorNettyCodec

codec, Scheduler scheduler) { + ReactorNettyHandler(TcpConnectionHandler

handler) { this.connectionHandler = handler; - this.codec = codec; - this.scheduler = scheduler; } @Override - public Publisher apply(NettyInbound in, NettyOutbound out) { - Flux>> inbound = in.receive().map(this.codec.getDecoder()); - DirectProcessor closeProcessor = DirectProcessor.create(); - - TcpConnection

tcpConnection = - new ReactorNettyTcpConnection<>(in, out, this.codec.getEncoder(), closeProcessor); + public Publisher apply(NettyInbound inbound, NettyOutbound outbound) { - this.scheduler.schedule(() -> connectionHandler.afterConnected(tcpConnection)); - inbound = inbound.publishOn(this.scheduler, QueueSupplier.SMALL_BUFFER_SIZE); + DirectProcessor completion = DirectProcessor.create(); + TcpConnection

connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); + scheduler.schedule(() -> connectionHandler.afterConnected(connection)); - inbound.flatMapIterable(Function.identity()) + inbound.receive() + .map(codec.getDecoder()) + .publishOn(scheduler, QueueSupplier.SMALL_BUFFER_SIZE) + .flatMapIterable(Function.identity()) .subscribe( connectionHandler::handleMessage, connectionHandler::handleFailure, connectionHandler::afterConnectionClosed); - return closeProcessor; - } - } - - - private static final class Reconnector implements Function, Publisher> { - - private final ReconnectStrategy strategy; - - Reconnector(ReconnectStrategy strategy) { - this.strategy = strategy; - } - - @Override - public Publisher apply(Flux flux) { - return flux.scan(1, (p, e) -> p++) - .flatMap(attempt -> Mono.delayMillis(strategy.getTimeToNextAttempt(attempt))); + return completion; } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index 99f03866b5..0a2793de7b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -16,8 +16,6 @@ package org.springframework.messaging.tcp.reactor; -import java.util.function.BiConsumer; - import io.netty.buffer.ByteBuf; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Mono; @@ -29,10 +27,7 @@ import org.springframework.messaging.tcp.TcpConnection; import org.springframework.util.concurrent.ListenableFuture; /** - * An implementation of {@link org.springframework.messaging.tcp.TcpConnection - * TcpConnection} based on the TCP client support of the Reactor project. - * - * @param

the payload type of messages read or written to the TCP stream. + * Reactor Netty based implementation of {@link TcpConnection}. * * @author Rossen Stoyanchev * @since 5.0 @@ -43,29 +38,27 @@ public class ReactorNettyTcpConnection

implements TcpConnection

{ private final NettyOutbound outbound; - private final DirectProcessor closeProcessor; + private final ReactorNettyCodec

codec; - private final BiConsumer> encoder; + private final DirectProcessor closeProcessor; public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound, - BiConsumer> encoder, - DirectProcessor closeProcessor) { + ReactorNettyCodec

codec, DirectProcessor closeProcessor) { this.inbound = inbound; this.outbound = outbound; - this.encoder = encoder; + this.codec = codec; this.closeProcessor = closeProcessor; } @Override public ListenableFuture send(Message

message) { - ByteBuf byteBuf = this.outbound.alloc() - .buffer(); - this.encoder.accept(byteBuf, message); - return new MonoToListenableFutureAdapter<>(this.outbound.send(Mono.just(byteBuf)) - .then()); + ByteBuf byteBuf = this.outbound.alloc().buffer(); + this.codec.getEncoder().accept(byteBuf, message); + Mono sendCompletion = this.outbound.send(Mono.just(byteBuf)).then(); + return new MonoToListenableFutureAdapter<>(sendCompletion); } @Override