diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 82d31bc78a..ca42a1ecd3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -32,10 +32,8 @@ import io.netty.util.concurrent.ImmediateEventExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.reactivestreams.Publisher; -import reactor.core.publisher.FluxIdentityProcessor; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; -import reactor.core.publisher.Processors; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.netty.Connection; @@ -317,7 +315,7 @@ public class ReactorNettyTcpClient
implements TcpOperations
{
logger.debug("Connected to " + conn.address());
}
});
- FluxIdentityProcessor connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));
diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java
index 54611adcfc..1442f83a42 100644
--- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java
+++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java
@@ -17,8 +17,8 @@
package org.springframework.messaging.tcp.reactor;
import io.netty.buffer.ByteBuf;
-import reactor.core.publisher.FluxIdentityProcessor;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.MonoProcessor;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;
@@ -42,11 +42,11 @@ public class ReactorNettyTcpConnection implements TcpConnection {
private final ReactorNettyCodec codec;
- private final FluxIdentityProcessor codec, FluxIdentityProcessor codec, MonoProcessor