Browse Source

Polish WebSocketIntegrationTests

pull/1810/merge
Rossen Stoyanchev 7 years ago
parent
commit
64b8b6e978
  1. 7
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java
  2. 23
      spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java

7
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/ReactorNettyWebSocketSession.java

@ -20,7 +20,6 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame; @@ -20,7 +20,6 @@ import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.ipc.netty.NettyInbound;
import reactor.ipc.netty.NettyOutbound;
import reactor.ipc.netty.NettyPipeline;
@ -35,8 +34,8 @@ import org.springframework.web.reactive.socket.WebSocketSession; @@ -35,8 +34,8 @@ import org.springframework.web.reactive.socket.WebSocketSession;
/**
* Spring {@link WebSocketSession} implementation that adapts to Reactor Netty's
* WebSocket {@link NettyInbound} and {@link NettyOutbound}.
* {@link WebSocketSession} implementation for use with the Reactor Netty's
* {@link NettyInbound} and {@link NettyOutbound}.
*
* @author Rossen Stoyanchev
* @since 5.0
@ -44,8 +43,6 @@ import org.springframework.web.reactive.socket.WebSocketSession; @@ -44,8 +43,6 @@ import org.springframework.web.reactive.socket.WebSocketSession;
public class ReactorNettyWebSocketSession
extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {
private final MonoProcessor<WebSocketFrame> closeMono = MonoProcessor.create();
public ReactorNettyWebSocketSession(WebsocketInbound inbound, WebsocketOutbound outbound,
HandshakeInfo info, NettyDataBufferFactory bufferFactory) {

23
spring-webflux/src/test/java/org/springframework/web/reactive/socket/WebSocketIntegrationTests.java

@ -43,13 +43,14 @@ import static org.junit.Assert.assertThat; @@ -43,13 +43,14 @@ import static org.junit.Assert.assertThat;
/**
* Integration tests with server-side {@link WebSocketHandler}s.
*
* @author Rossen Stoyanchev
*/
public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests {
private static final Log logger = LogFactory.getLog(WebSocketIntegrationTests.class);
private static final Duration TIMEOUT = Duration.ofMillis(5000);
@Override
protected Class<?> getWebConfigClass() {
@ -71,14 +72,12 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests @@ -71,14 +72,12 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
.thenMany(session.receive().take(count).map(WebSocketMessage::getPayloadAsText))
.subscribeWith(output)
.doOnNext(s -> logger.debug("inbound " + s))
.then()
.doOnSuccessOrError((aVoid, ex) ->
logger.debug("Done with " + (ex != null ? ex.getMessage() : "success")));
.then();
})
.block(Duration.ofMillis(5000));
.doOnSuccessOrError((aVoid, ex) -> logger.debug("Done: " + (ex != null ? ex.getMessage() : "success")))
.block(TIMEOUT);
assertEquals(input.collectList().block(Duration.ofMillis(5000)),
output.collectList().block(Duration.ofMillis(5000)));
assertEquals(input.collectList().block(TIMEOUT), output.collectList().block(TIMEOUT));
}
@Test
@ -102,13 +101,13 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests @@ -102,13 +101,13 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
.then();
}
})
.block(Duration.ofMillis(5000));
.block(TIMEOUT);
HandshakeInfo info = infoRef.get();
assertThat(info.getHeaders().getFirst("Upgrade"), Matchers.equalToIgnoringCase("websocket"));
assertEquals(protocol, info.getHeaders().getFirst("Sec-WebSocket-Protocol"));
assertEquals("Wrong protocol accepted", protocol, info.getSubProtocol());
assertEquals("Wrong protocol detected on the server side", protocol, output.block(Duration.ofMillis(5000)));
assertEquals("Wrong protocol detected on the server side", protocol, output.block(TIMEOUT));
}
@Test
@ -122,9 +121,9 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests @@ -122,9 +121,9 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
.map(WebSocketMessage::getPayloadAsText)
.subscribeWith(output)
.then())
.block(Duration.ofMillis(5000));
.block(TIMEOUT);
assertEquals("my-header:my-value", output.block(Duration.ofMillis(5000)));
assertEquals("my-header:my-value", output.block(TIMEOUT));
}
@Test
@ -139,7 +138,7 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests @@ -139,7 +138,7 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
logger.debug("Completed with: " + signalType);
});
})
.block(Duration.ofMillis(5000));
.block(TIMEOUT);
}

Loading…
Cancel
Save