From 7cf1ccc41540752a16e370b5dd353b94108ebf22 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Sun, 21 Jun 2020 21:18:20 +0100 Subject: [PATCH] Use MonoProcessor instead of FluxIdentityProcessor We just need to signal completion when close() is called. MonoProcessor should suffice and we can avoid a hard dependency on Reactor 3.4. See gh-25085 --- .../messaging/tcp/reactor/ReactorNettyTcpClient.java | 4 +--- .../messaging/tcp/reactor/ReactorNettyTcpConnection.java | 6 +++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 82d31bc78a..ca42a1ecd3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -32,10 +32,8 @@ import io.netty.util.concurrent.ImmediateEventExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; -import reactor.core.publisher.FluxIdentityProcessor; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.Processors; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; @@ -317,7 +315,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ logger.debug("Connected to " + conn.address()); } }); - FluxIdentityProcessor completion = Processors.more().multicastNoBackpressure(); + MonoProcessor completion = MonoProcessor.create(); TcpConnection

connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); scheduler.schedule(() -> this.connectionHandler.afterConnected(connection)); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index 54611adcfc..1442f83a42 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -17,8 +17,8 @@ package org.springframework.messaging.tcp.reactor; import io.netty.buffer.ByteBuf; -import reactor.core.publisher.FluxIdentityProcessor; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; @@ -42,11 +42,11 @@ public class ReactorNettyTcpConnection

implements TcpConnection

{ private final ReactorNettyCodec

codec; - private final FluxIdentityProcessor closeProcessor; + private final MonoProcessor closeProcessor; public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound, - ReactorNettyCodec

codec, FluxIdentityProcessor closeProcessor) { + ReactorNettyCodec

codec, MonoProcessor closeProcessor) { this.inbound = inbound; this.outbound = outbound;