From 698c885e0601bb03dd5a763c5a24e1703433a174 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 28 Dec 2016 16:35:12 -0500 Subject: [PATCH] Reconnect failures delegated to TcpConnectionHandler When connecting with a ReconnectStrategy we can only report the outcome of the first connect to the ListenableFuture return value. Failures for all subsequent attempts to reconnect however must be channeled to TcpConnectHandler#afterConnectFailure which is used in the STOMP broker relay for example to publish BroadcastAvailability(true/false) events. --- .../messaging/tcp/reactor/ReactorNettyTcpClient.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index b65890048a..95a92bc5f9 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -116,12 +116,15 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ return handleShuttingDownConnectFailure(handler); } + // Report first connect to the ListenableFuture MonoProcessor connectMono = MonoProcessor.create(); + this.tcpClient .newHandler(new ReactorNettyHandler(handler)) - .doOnNext(connectFailureConsumer(connectMono)) - .doOnError(connectFailureConsumer(connectMono)) - .then(NettyContext::onClose) + .doOnNext(updateConnectMono(connectMono)) + .doOnError(updateConnectMono(connectMono)) + .doOnError(handler::afterConnectFailure) // report all connect failures to the handler + .then(NettyContext::onClose) // post-connect issues .retryWhen(reconnectFunction(strategy)) .repeatWhen(reconnectFunction(strategy)) .subscribe(); @@ -135,7 +138,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ return new MonoToListenableFutureAdapter<>(Mono.error(ex)); } - private Consumer connectFailureConsumer(MonoProcessor connectMono) { + private Consumer updateConnectMono(MonoProcessor connectMono) { return o -> { if (!connectMono.isTerminated()) { if (o instanceof Throwable) {