diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index d9549ee933..3617ddb2b2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -16,12 +16,12 @@ package org.springframework.messaging.tcp.reactor; -import java.util.Collection; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.ImmediateEventExecutor; import org.reactivestreams.Publisher; @@ -39,20 +39,16 @@ import reactor.ipc.netty.options.ClientOptions; import reactor.ipc.netty.tcp.TcpClient; import reactor.util.concurrent.QueueSupplier; -import org.springframework.messaging.Message; import org.springframework.messaging.tcp.ReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.SettableListenableFuture; /** - * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} - * based on the TCP client support of the Reactor project. - * - *
This implementation wraps N (Reactor) clients for N {@link #connect} calls, - * i.e. a separate (Reactor) client instance for each connection. + * Reactor Netty based implementation of {@link TcpOperations}. * * @author Rossen Stoyanchev * @author Stephane Maldini @@ -64,55 +60,42 @@ public class ReactorNettyTcpClient
implements TcpOperations
{ private final ReactorNettyCodec
codec; - private final Scheduler scheduler = Schedulers.newParallel("ReactorNettyTcpClient"); + private final ChannelGroup channelGroup; - private final ChannelGroup group; + private final Scheduler scheduler = Schedulers.newParallel("ReactorNettyTcpClient"); - private volatile boolean stopping; + private volatile boolean stopping = false; /** - * A constructor that creates a {@link TcpClient TcpClient} factory relying on - * Reactor Netty TCP threads. The number of Netty threads can be tweaked with - * the {@code reactor.tcp.ioThreadCount} System property. The network I/O - * threads will be shared amongst the active clients. - *
Also see the constructor accepting a {@link Consumer} of - * {@link ClientOptions} for additional options. - * @param host the host to connect to - * @param port the port to connect to - * @param codec for encoding and decoding messages + * Basic constructor with a host and a port. */ public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec
codec) { this(opts -> opts.connect(host, port), codec); } /** - * A constructor with a configurator {@link Consumer} that will receive default - * {@link ClientOptions} from {@link TcpClient}. This might be used to add SSL - * or specific network parameters to the generated client configuration. - * @param tcpOptions callback for configuring shared {@link ClientOptions} - * @param codec for encoding and decoding messages + * Alternate constructor with a {@link ClientOptions} consumer providing + * additional control beyond a host and a port. */ - public ReactorNettyTcpClient(Consumer super ClientOptions> tcpOptions, ReactorNettyCodec
codec) {
- Assert.notNull(codec, "'codec' is required");
- this.group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
- this.tcpClient = TcpClient.create(opts -> tcpOptions.accept(opts.channelGroup(group)));
+ public ReactorNettyTcpClient(Consumer codec) {
+ Assert.notNull(consumer, "Consumer handler) {
- Assert.notNull(handler, "'handler' is required");
-
+ Assert.notNull(handler, "TcpConnectionHandler is required");
if (this.stopping) {
- IllegalStateException ex = new IllegalStateException("Shutting down.");
- handler.afterConnectFailure(ex);
- return new MonoToListenableFutureAdapter<>(Mono. implements TcpOperations {
@Override
public ListenableFuture handler, ReconnectStrategy strategy) {
- Assert.notNull(handler, "'handler' is required");
- Assert.notNull(strategy, "'reconnectStrategy' is required");
-
+ Assert.notNull(handler, "TcpConnectionHandler is required");
+ Assert.notNull(strategy, "ReconnectStrategy is required");
if (this.stopping) {
- IllegalStateException ex = new IllegalStateException("Shutting down.");
- handler.afterConnectFailure(ex);
- return new MonoToListenableFutureAdapter<>(Mono. handler) {
+ IllegalStateException ex = new IllegalStateException("Shutting down.");
+ handler.afterConnectFailure(ex);
+ return new MonoToListenableFutureAdapter<>(Mono.error(ex));
+ }
+
+ private implements BiFunction connectionHandler;
- private final ReactorNettyCodec codec;
- private final Scheduler scheduler;
-
- MessageHandler(TcpConnectionHandler handler, ReactorNettyCodec codec, Scheduler scheduler) {
+ ReactorNettyHandler(TcpConnectionHandler handler) {
this.connectionHandler = handler;
- this.codec = codec;
- this.scheduler = scheduler;
}
@Override
- public Publisher tcpConnection =
- new ReactorNettyTcpConnection<>(in, out, this.codec.getEncoder(), closeProcessor);
+ public Publisher connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
+ scheduler.schedule(() -> connectionHandler.afterConnected(connection));
- inbound.flatMapIterable(Function.identity())
+ inbound.receive()
+ .map(codec.getDecoder())
+ .publishOn(scheduler, QueueSupplier.SMALL_BUFFER_SIZE)
+ .flatMapIterable(Function.identity())
.subscribe(
connectionHandler::handleMessage,
connectionHandler::handleFailure,
connectionHandler::afterConnectionClosed);
- return closeProcessor;
- }
- }
-
-
- private static final class Reconnector the payload type of messages read or written to the TCP stream.
+ * Reactor Netty based implementation of {@link TcpConnection}.
*
* @author Rossen Stoyanchev
* @since 5.0
@@ -43,29 +38,27 @@ public class ReactorNettyTcpConnection implements TcpConnection {
private final NettyOutbound outbound;
- private final DirectProcessor codec;
- private final BiConsumer super ByteBuf, ? super Message > encoder;
+ private final DirectProcessor > encoder,
- DirectProcessor codec, DirectProcessor message) {
- ByteBuf byteBuf = this.outbound.alloc()
- .buffer();
- this.encoder.accept(byteBuf, message);
- return new MonoToListenableFutureAdapter<>(this.outbound.send(Mono.just(byteBuf))
- .then());
+ ByteBuf byteBuf = this.outbound.alloc().buffer();
+ this.codec.getEncoder().accept(byteBuf, message);
+ Mono