diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index b65890048a..95a92bc5f9 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -116,12 +116,15 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ return handleShuttingDownConnectFailure(handler); } + // Report first connect to the ListenableFuture MonoProcessor connectMono = MonoProcessor.create(); + this.tcpClient .newHandler(new ReactorNettyHandler(handler)) - .doOnNext(connectFailureConsumer(connectMono)) - .doOnError(connectFailureConsumer(connectMono)) - .then(NettyContext::onClose) + .doOnNext(updateConnectMono(connectMono)) + .doOnError(updateConnectMono(connectMono)) + .doOnError(handler::afterConnectFailure) // report all connect failures to the handler + .then(NettyContext::onClose) // post-connect issues .retryWhen(reconnectFunction(strategy)) .repeatWhen(reconnectFunction(strategy)) .subscribe(); @@ -135,7 +138,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ return new MonoToListenableFutureAdapter<>(Mono.error(ex)); } - private Consumer connectFailureConsumer(MonoProcessor connectMono) { + private Consumer updateConnectMono(MonoProcessor connectMono) { return o -> { if (!connectMono.isTerminated()) { if (o instanceof Throwable) {