Browse Source

Use MonoProcessor instead of FluxIdentityProcessor

We just need to signal completion when close() is called. MonoProcessor
should suffice and we can avoid a hard dependency on Reactor 3.4.

See gh-25085
pull/25297/head
Rossen Stoyanchev 4 years ago
parent
commit
7cf1ccc415
  1. 4
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java
  2. 6
      spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java

4
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

@ -32,10 +32,8 @@ import io.netty.util.concurrent.ImmediateEventExecutor;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.FluxIdentityProcessor;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.Processors;
import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers; import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection; import reactor.netty.Connection;
@ -317,7 +315,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
logger.debug("Connected to " + conn.address()); logger.debug("Connected to " + conn.address());
} }
}); });
FluxIdentityProcessor<Void> completion = Processors.more().multicastNoBackpressure(); MonoProcessor<Void> completion = MonoProcessor.create();
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion); TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
scheduler.schedule(() -> this.connectionHandler.afterConnected(connection)); scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));

6
spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java

@ -17,8 +17,8 @@
package org.springframework.messaging.tcp.reactor; package org.springframework.messaging.tcp.reactor;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import reactor.core.publisher.FluxIdentityProcessor;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.netty.NettyInbound; import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound; import reactor.netty.NettyOutbound;
@ -42,11 +42,11 @@ public class ReactorNettyTcpConnection<P> implements TcpConnection<P> {
private final ReactorNettyCodec<P> codec; private final ReactorNettyCodec<P> codec;
private final FluxIdentityProcessor<Void> closeProcessor; private final MonoProcessor<Void> closeProcessor;
public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound, public ReactorNettyTcpConnection(NettyInbound inbound, NettyOutbound outbound,
ReactorNettyCodec<P> codec, FluxIdentityProcessor<Void> closeProcessor) { ReactorNettyCodec<P> codec, MonoProcessor<Void> closeProcessor) {
this.inbound = inbound; this.inbound = inbound;
this.outbound = outbound; this.outbound = outbound;

Loading…
Cancel
Save