diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java index a5cd143b7f..dcf00451c7 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java @@ -20,7 +20,6 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; import reactor.ipc.netty.NettyInbound; import reactor.ipc.netty.NettyOutbound; import reactor.ipc.netty.NettyPipeline; @@ -35,8 +34,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; /** - * Spring {@link WebSocketSession} implementation that adapts to Reactor Netty's - * WebSocket {@link NettyInbound} and {@link NettyOutbound}. + * {@link WebSocketSession} implementation for use with the Reactor Netty's + * {@link NettyInbound} and {@link NettyOutbound}. * * @author Rossen Stoyanchev * @since 5.0 @@ -44,8 +43,6 @@ import org.springframework.web.reactive.socket.WebSocketSession; public class ReactorNettyWebSocketSession extends NettyWebSocketSessionSupport { - private final MonoProcessor closeMono = MonoProcessor.create(); - public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound, HandshakeInfo info, NettyDataBufferFactory bufferFactory) { diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java index b53c4536ac..4bfcdd6d63 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java @@ -43,13 +43,14 @@ import static org.junit.Assert.assertThat; /** * Integration tests with server-side {@link WebSocketHandler}s. - * * @author Rossen Stoyanchev */ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests { private static final Log logger = LogFactory.getLog(WebSocketIntegrationTests.class); + private static final Duration TIMEOUT = Duration.ofMillis(5000); + @Override protected Class getWebConfigClass() { @@ -71,14 +72,12 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests .thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText)) .subscribeWith(output) .doOnNext(s -> logger.debug("inbound " + s)) - .then() - .doOnSuccessOrError((aVoid, ex) -> - logger.debug("Done with " + (ex != null ? ex.getMessage() : "success"))); + .then(); }) - .block(Duration.ofMillis(5000)); + .doOnSuccessOrError((aVoid, ex) -> logger.debug("Done: " + (ex != null ? ex.getMessage() : "success"))) + .block(TIMEOUT); - assertEquals(input.collectList().block(Duration.ofMillis(5000)), - output.collectList().block(Duration.ofMillis(5000))); + assertEquals(input.collectList().block(TIMEOUT), output.collectList().block(TIMEOUT)); } @Test @@ -102,13 +101,13 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests .then(); } }) - .block(Duration.ofMillis(5000)); + .block(TIMEOUT); HandshakeInfo info = infoRef.get(); assertThat(info.getHeaders().getFirst("Upgrade"), Matchers.equalToIgnoringCase("websocket")); assertEquals(protocol, info.getHeaders().getFirst("Sec-WebSocket-Protocol")); assertEquals("Wrong protocol accepted", protocol, info.getSubProtocol()); - assertEquals("Wrong protocol detected on the server side", protocol, output.block(Duration.ofMillis(5000))); + assertEquals("Wrong protocol detected on the server side", protocol, output.block(TIMEOUT)); } @Test @@ -122,9 +121,9 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests .map(WebSocketMessage::getPayloadAsText) .subscribeWith(output) .then()) - .block(Duration.ofMillis(5000)); + .block(TIMEOUT); - assertEquals("my-header:my-value", output.block(Duration.ofMillis(5000))); + assertEquals("my-header:my-value", output.block(TIMEOUT)); } @Test @@ -139,7 +138,7 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests logger.debug("Completed with: " + signalType); }); }) - .block(Duration.ofMillis(5000)); + .block(TIMEOUT); }