diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java index ea4523aee4..63ffbf09f3 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/CloseStatus.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,6 +19,7 @@ package org.springframework.web.reactive.socket; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; +import org.springframework.util.StringUtils; /** * Representation of WebSocket "close" status codes and reasons. Status codes @@ -184,11 +185,55 @@ public final class CloseStatus { return new CloseStatus(this.code, reason); } - + /** + * @deprecated as of 5.3 in favor of comparing codes directly + */ + @Deprecated public boolean equalsCode(CloseStatus other) { return (this.code == other.code); } + + /** + * Return a constant for the given code, or create a new instance if the + * code does not match or there is a reason. + * @since 5.3 + */ + public static CloseStatus create(int code, @Nullable String reason) { + if (StringUtils.isEmpty(reason)) { + switch (code) { + case 1000: + return NORMAL; + case 1001: + return GOING_AWAY; + case 1002: + return PROTOCOL_ERROR; + case 1003: + return NOT_ACCEPTABLE; + case 1005: + return NO_STATUS_CODE; + case 1006: + return NO_CLOSE_FRAME; + case 1007: + return BAD_DATA; + case 1008: + return POLICY_VIOLATION; + case 1009: + return TOO_BIG_TO_PROCESS; + case 1010: + return REQUIRED_EXTENSION; + case 1011: + return SERVER_ERROR; + case 1012: + return SERVICE_RESTARTED; + case 1013: + return SERVICE_OVERLOAD; + } + } + return new CloseStatus(code, reason); + } + + @Override public boolean equals(@Nullable Object other) { if (this == other) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java index 9b08577ed8..4fac80ee4d 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,6 +95,13 @@ public interface WebSocketSession { */ Mono close(CloseStatus status); + /** + * Provides access to the {@code CloseStatus} with which the session is + * closed either locally or remotely, or completes empty if the session ended + * without a status. + * @since 5.3 + */ + Mono closeStatus(); // WebSocketMessage factory methods diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index 5e2bdfbd55..52c29ecc72 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,6 +35,7 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.HandshakeInfo; +import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; import org.springframework.web.reactive.socket.WebSocketMessage.Type; import org.springframework.web.reactive.socket.WebSocketSession; @@ -44,8 +45,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; * event-listener WebSocket APIs (e.g. Java WebSocket API JSR-356, Jetty, * Undertow) and Reactive Streams. * - *

Also an implementation of {@code Subscriber<Void>} so it can be used as - * the completion subscriber for session handling + *

Also implements {@code Subscriber} so it can be used to subscribe to + * the completion of {@link WebSocketHandler#handle(WebSocketSession)}. * * @author Violeta Georgieva * @author Rossen Stoyanchev @@ -63,7 +64,7 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc @Nullable - private final MonoProcessor completionMono; + private final MonoProcessor handlerCompletion; private final WebSocketReceivePublisher receivePublisher; @@ -72,6 +73,8 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc private final AtomicBoolean sendCalled = new AtomicBoolean(); + private final MonoProcessor closeStatusProcessor = MonoProcessor.create(); + /** * Base constructor. @@ -87,15 +90,16 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc } /** - * Alternative constructor with completion {@code Mono<Void>} to propagate - * the session completion (success or error) (for client-side use). + * Alternative constructor with completion {@code Mono} to propagate + * session completion (success or error). This is primarily for use with the + * {@code WebSocketClient} to be able to report the end of execution. */ public AbstractListenerWebSocketSession(T delegate, String id, HandshakeInfo info, - DataBufferFactory bufferFactory, @Nullable MonoProcessor completionMono) { + DataBufferFactory bufferFactory, @Nullable MonoProcessor handlerCompletion) { super(delegate, id, info, bufferFactory); this.receivePublisher = new WebSocketReceivePublisher(); - this.completionMono = completionMono; + this.handlerCompletion = handlerCompletion; } @@ -126,6 +130,11 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc } } + @Override + public Mono closeStatus() { + return this.closeStatusProcessor; + } + /** * Whether the underlying WebSocket API has flow control and can suspend and * resume the receiving of messages. @@ -170,6 +179,7 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc /** Handle an error callback from the WebSocketHandler adapter. */ void handleError(Throwable ex) { + this.closeStatusProcessor.onComplete(); this.receivePublisher.onError(ex); WebSocketSendProcessor sendProcessor = this.sendProcessor; if (sendProcessor != null) { @@ -179,7 +189,8 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc } /** Handle a close callback from the WebSocketHandler adapter. */ - void handleClose(CloseStatus reason) { + void handleClose(CloseStatus closeStatus) { + this.closeStatusProcessor.onNext(closeStatus); this.receivePublisher.onAllDataRead(); WebSocketSendProcessor sendProcessor = this.sendProcessor; if (sendProcessor != null) { @@ -189,7 +200,7 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc } - // Subscriber implementation + // Subscriber implementation tracking WebSocketHandler#handle completion @Override public void onSubscribe(Subscription subscription) { @@ -203,17 +214,16 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc @Override public void onError(Throwable ex) { - if (this.completionMono != null) { - this.completionMono.onError(ex); + if (this.handlerCompletion != null) { + this.handlerCompletion.onError(ex); } - int code = CloseStatus.SERVER_ERROR.getCode(); - close(new CloseStatus(code, ex.getMessage())); + close(CloseStatus.SERVER_ERROR.withReason(ex.getMessage())); } @Override public void onComplete() { - if (this.completionMono != null) { - this.completionMono.onComplete(); + if (this.handlerCompletion != null) { + this.handlerCompletion.onComplete(); } close(); } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java index 0d8c30de0c..baa8e14537 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -131,7 +131,7 @@ public class JettyWebSocketHandlerAdapter { @OnWebSocketClose public void onWebSocketClose(int statusCode, String reason) { if (this.delegateSession != null) { - this.delegateSession.handleClose(new CloseStatus(statusCode, reason)); + this.delegateSession.handleClose(CloseStatus.create(statusCode, reason)); } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java index 23376edbec..f91b7674d4 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,9 +95,15 @@ public class ReactorNettyWebSocketSession @Override public Mono close(CloseStatus status) { + // this will notify WebSocketInbound.receiveCloseStatus() return getDelegate().getOutbound().sendClose(status.getCode(), status.getReason()); } + @Override + public Mono closeStatus() { + return getDelegate().getInbound().receiveCloseStatus() + .map(status -> CloseStatus.create(status.code(), status.reasonText())); + } /** * Simple container for {@link NettyInbound} and {@link NettyOutbound}. diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketHandlerAdapter.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketHandlerAdapter.java index c587611b86..a503d9d76c 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketHandlerAdapter.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketHandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -110,7 +110,7 @@ public class StandardWebSocketHandlerAdapter extends Endpoint { public void onClose(Session session, CloseReason reason) { if (this.delegateSession != null) { int code = reason.getCloseCode().getCode(); - this.delegateSession.handleClose(new CloseStatus(code, reason.getReasonPhrase())); + this.delegateSession.handleClose(CloseStatus.create(code, reason.getReasonPhrase())); } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java index 2b5203b523..e2b0323a61 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketHandlerAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2018 the original author or authors. + * Copyright 2002-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -72,7 +72,7 @@ public class UndertowWebSocketHandlerAdapter extends AbstractReceiveListener { @Override protected void onFullCloseMessage(WebSocketChannel channel, BufferedBinaryMessage message) { CloseMessage closeMessage = new CloseMessage(message.getData().getResource()); - this.session.handleClose(new CloseStatus(closeMessage.getCode(), closeMessage.getReason())); + this.session.handleClose(CloseStatus.create(closeMessage.getCode(), closeMessage.getReason())); message.getData().free(); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java index 9be00e9d35..f85d5ea39c 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java @@ -147,9 +147,11 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests { void sessionClosing(WebSocketClient client, HttpServer server, Class serverConfigClass) throws Exception { startServer(client, server, serverConfigClass); + MonoProcessor statusProcessor = MonoProcessor.create(); this.client.execute(getUrl("/close"), session -> { logger.debug("Starting.."); + session.closeStatus().subscribe(statusProcessor); return session.receive() .doOnNext(s -> logger.debug("inbound " + s)) .then() @@ -158,6 +160,8 @@ class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests { ); }) .block(TIMEOUT); + + assertThat(statusProcessor.block()).isEqualTo(CloseStatus.GOING_AWAY); } @ParameterizedWebSocketTest