Browse Source

Add isOpen to WebSocketSession in WebFlux

Closes gh-26043
pull/26058/head
Rossen Stoyanchev 4 years ago
parent
commit
6bb3ad793e
  1. 6
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java
  2. 5
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java
  3. 25
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java
  4. 5
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java
  5. 5
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java
  6. 2
      spring-websocket/src/main/java/org/springframework/web/socket/WebSocketSession.java

6
spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java

@ -80,6 +80,12 @@ public interface WebSocketSession {
*/ */
Mono<Void> send(Publisher<WebSocketMessage> messages); Mono<Void> send(Publisher<WebSocketMessage> messages);
/**
* Whether the underlying connection is open.
* @since 5.3.1
*/
boolean isOpen();
/** /**
* Close the WebSocket session with {@link CloseStatus#NORMAL}. * Close the WebSocket session with {@link CloseStatus#NORMAL}.
*/ */

5
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java

@ -113,6 +113,11 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
return true; return true;
} }
@Override
public boolean isOpen() {
return getDelegate().isOpen();
}
@Override @Override
public Mono<Void> close(CloseStatus status) { public Mono<Void> close(CloseStatus status) {
getDelegate().close(status.getCode(), status.getReason()); getDelegate().close(status.getCode(), status.getReason());

25
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java

@ -15,10 +15,13 @@
*/ */
package org.springframework.web.reactive.socket.adapter; package org.springframework.web.reactive.socket.adapter;
import java.util.function.Consumer;
import io.netty.handler.codec.http.websocketx.WebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.NettyInbound; import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound; import reactor.netty.NettyOutbound;
import reactor.netty.http.websocket.WebsocketInbound; import reactor.netty.http.websocket.WebsocketInbound;
@ -93,6 +96,13 @@ public class ReactorNettyWebSocketSession
.then(); .then();
} }
@Override
public boolean isOpen() {
DisposedCallback callback = new DisposedCallback();
getDelegate().getInbound().withConnection(callback);
return callback.isDisposed();
}
@Override @Override
public Mono<Void> close(CloseStatus status) { public Mono<Void> close(CloseStatus status) {
// this will notify WebSocketInbound.receiveCloseStatus() // this will notify WebSocketInbound.receiveCloseStatus()
@ -129,4 +139,19 @@ public class ReactorNettyWebSocketSession
} }
} }
private static class DisposedCallback implements Consumer<Connection> {
private boolean disposed;
public boolean isDisposed() {
return this.disposed;
}
@Override
public void accept(Connection connection) {
this.disposed = connection.isDisposed();
}
}
} }

5
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java

@ -103,6 +103,11 @@ public class StandardWebSocketSession extends AbstractListenerWebSocketSession<S
return true; return true;
} }
@Override
public boolean isOpen() {
return getDelegate().isOpen();
}
@Override @Override
public Mono<Void> close(CloseStatus status) { public Mono<Void> close(CloseStatus status) {
try { try {

5
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

@ -108,6 +108,11 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W
return true; return true;
} }
@Override
public boolean isOpen() {
return getDelegate().isOpen();
}
@Override @Override
public Mono<Void> close(CloseStatus status) { public Mono<Void> close(CloseStatus status) {
CloseMessage cm = new CloseMessage(status.getCode(), status.getReason()); CloseMessage cm = new CloseMessage(status.getCode(), status.getReason());

2
spring-websocket/src/main/java/org/springframework/web/socket/WebSocketSession.java

@ -130,7 +130,7 @@ public interface WebSocketSession extends Closeable {
void sendMessage(WebSocketMessage<?> message) throws IOException; void sendMessage(WebSocketMessage<?> message) throws IOException;
/** /**
* Return whether the connection is still open. * Whether the underlying connection is open.
*/ */
boolean isOpen(); boolean isOpen();

Loading…
Cancel
Save