From 6bb3ad793eedd2f5eda98d5897ab207f081c87ec Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 9 Nov 2020 14:06:18 +0000 Subject: [PATCH] Add isOpen to WebSocketSession in WebFlux Closes gh-26043 --- .../web/reactive/socket/WebSocketSession.java | 6 +++++ .../socket/adapter/JettyWebSocketSession.java | 5 ++++ .../adapter/ReactorNettyWebSocketSession.java | 25 +++++++++++++++++++ .../adapter/StandardWebSocketSession.java | 5 ++++ .../adapter/UndertowWebSocketSession.java | 5 ++++ .../web/socket/WebSocketSession.java | 2 +- 6 files changed, 47 insertions(+), 1 deletion(-) diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java index 6dee9e3899..b3e3053ab4 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java @@ -80,6 +80,12 @@ public interface WebSocketSession { */ Mono send(Publisher messages); + /** + * Whether the underlying connection is open. + * @since 5.3.1 + */ + boolean isOpen(); + /** * Close the WebSocket session with {@link CloseStatus#NORMAL}. */ diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index e39d48a39d..a8a9240a61 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -113,6 +113,11 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession close(CloseStatus status) { getDelegate().close(status.getCode(), status.getReason()); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java index f91b7674d4..ab58f6f1af 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java @@ -15,10 +15,13 @@ */ package org.springframework.web.reactive.socket.adapter; +import java.util.function.Consumer; + import io.netty.handler.codec.http.websocketx.WebSocketFrame; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.netty.Connection; import reactor.netty.NettyInbound; import reactor.netty.NettyOutbound; import reactor.netty.http.websocket.WebsocketInbound; @@ -93,6 +96,13 @@ public class ReactorNettyWebSocketSession .then(); } + @Override + public boolean isOpen() { + DisposedCallback callback = new DisposedCallback(); + getDelegate().getInbound().withConnection(callback); + return callback.isDisposed(); + } + @Override public Mono close(CloseStatus status) { // this will notify WebSocketInbound.receiveCloseStatus() @@ -129,4 +139,19 @@ public class ReactorNettyWebSocketSession } } + + private static class DisposedCallback implements Consumer { + + private boolean disposed; + + public boolean isDisposed() { + return this.disposed; + } + + @Override + public void accept(Connection connection) { + this.disposed = connection.isDisposed(); + } + } + } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java index 7c84f2bc9d..3679f36f0e 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java @@ -103,6 +103,11 @@ public class StandardWebSocketSession extends AbstractListenerWebSocketSession close(CloseStatus status) { try { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index ea88778e4a..3d4e99ab85 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -108,6 +108,11 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession close(CloseStatus status) { CloseMessage cm = new CloseMessage(status.getCode(), status.getReason()); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketSession.java b/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketSession.java index 308eb30ce8..56388d9b47 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketSession.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/WebSocketSession.java @@ -130,7 +130,7 @@ public interface WebSocketSession extends Closeable { void sendMessage(WebSocketMessage message) throws IOException; /** - * Return whether the connection is still open. + * Whether the underlying connection is open. */ boolean isOpen();