diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java index 0a2793de7b..9cb0caa2a3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java @@ -17,10 +17,12 @@ package org.springframework.messaging.tcp.reactor; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelPipeline; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Mono; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; +import reactor.ipc.netty.NettyPipeline; import org.springframework.messaging.Message; import org.springframework.messaging.tcp.TcpConnection; @@ -64,6 +66,14 @@ public class ReactorNettyTcpConnection

implements TcpConnection

{ @Override @SuppressWarnings("deprecation") public void onReadInactivity(Runnable runnable, long inactivityDuration) { + + // TODO: workaround for https://github.com/reactor/reactor-netty/issues/22 + ChannelPipeline pipeline = this.inbound.context().channel().pipeline(); + String name = NettyPipeline.OnChannelReadIdle; + if (pipeline.context(name) != null) { + pipeline.remove(name); + } + this.inbound.onReadIdle(inactivityDuration, runnable); }