@ -15,13 +15,11 @@
@@ -15,13 +15,11 @@
* /
package org.springframework.web.reactive.socket.server ;
import java.net.URISyntaxException ;
import java.util.HashMap ;
import java.util.Map ;
import java.util.concurrent.atomic.AtomicReference ;
import org.hamcrest.Matchers ;
import org.junit.Ignore ;
import org.junit.Test ;
import org.reactivestreams.Publisher ;
import reactor.core.publisher.Flux ;
@ -32,7 +30,6 @@ import reactor.core.publisher.ReplayProcessor;
@@ -32,7 +30,6 @@ import reactor.core.publisher.ReplayProcessor;
import org.springframework.context.annotation.Bean ;
import org.springframework.context.annotation.Configuration ;
import org.springframework.http.HttpHeaders ;
import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer ;
import org.springframework.web.reactive.HandlerMapping ;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping ;
import org.springframework.web.reactive.socket.HandshakeInfo ;
@ -41,13 +38,10 @@ import org.springframework.web.reactive.socket.WebSocketMessage;
@@ -41,13 +38,10 @@ import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession ;
import org.springframework.web.reactive.socket.client.JettyWebSocketClient ;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient ;
import org.springframework.web.reactive.socket.client.RxNettyWebSocketClient ;
import org.springframework.web.reactive.socket.client.StandardWebSocketClient ;
import org.springframework.web.reactive.socket.client.UndertowWebSocketClient ;
import org.springframework.web.reactive.socket.client.WebSocketClient ;
import static org.junit.Assert.assertEquals ;
import static org.junit.Assert.assertThat ;
import static org.junit.Assume.assumeFalse ;
/ * *
* Integration tests with server - side { @link WebSocketHandler } s .
@ -64,44 +58,7 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
@@ -64,44 +58,7 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
@Test
public void echoReactorClient ( ) throws Exception {
testEcho ( new ReactorNettyWebSocketClient ( ) ) ;
}
@Test
public void echoRxNettyClient ( ) throws Exception {
testEcho ( new RxNettyWebSocketClient ( ) ) ;
}
@Test
public void echoJettyClient ( ) throws Exception {
JettyWebSocketClient client = new JettyWebSocketClient ( ) ;
client . start ( ) ;
testEcho ( client ) ;
client . stop ( ) ;
}
@Test
public void echoStandardClient ( ) throws Exception {
testEcho ( new StandardWebSocketClient ( ) ) ;
}
@Test
public void echoUndertowClient ( ) throws Exception {
if ( server instanceof RxNettyHttpServer ) {
// Caused by: java.io.IOException: Upgrade responses cannot have a transfer coding
// at org.xnio.http.HttpUpgrade$HttpUpgradeState.handleUpgrade(HttpUpgrade.java:490)
// at org.xnio.http.HttpUpgrade$HttpUpgradeState.access$1200(HttpUpgrade.java:165)
// at org.xnio.http.HttpUpgrade$HttpUpgradeState$UpgradeResultListener.handleEvent(HttpUpgrade.java:461)
// at org.xnio.http.HttpUpgrade$HttpUpgradeState$UpgradeResultListener.handleEvent(HttpUpgrade.java:400)
// at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
return ;
}
testEcho ( new UndertowWebSocketClient ( ) ) ;
}
private void testEcho ( WebSocketClient client ) throws URISyntaxException {
public void echo ( ) throws Exception {
int count = 100 ;
Flux < String > input = Flux . range ( 1 , count ) . map ( index - > "msg-" + index ) ;
ReplayProcessor < Object > output = ReplayProcessor . create ( count ) ;
@ -118,45 +75,12 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
@@ -118,45 +75,12 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
}
@Test
@Ignore ( "https://github.com/reactor/reactor-netty/issues/20" )
public void subProtocolReactorClient ( ) throws Exception {
testSubProtocol ( new ReactorNettyWebSocketClient ( ) ) ;
}
@Test
public void subProtocolRxNettyClient ( ) throws Exception {
testSubProtocol ( new RxNettyWebSocketClient ( ) ) ;
}
@Test
public void subProtocolJettyClient ( ) throws Exception {
JettyWebSocketClient client = new JettyWebSocketClient ( ) ;
client . start ( ) ;
testSubProtocol ( client ) ;
client . stop ( ) ;
}
public void subProtocol ( ) throws Exception {
@Test
public void subProtocolStandardClient ( ) throws Exception {
testSubProtocol ( new StandardWebSocketClient ( ) ) ;
}
// TODO
// https://github.com/reactor/reactor-netty/issues/20
assumeFalse ( client instanceof ReactorNettyWebSocketClient ) ;
@Test
public void subProtocolUndertowClient ( ) throws Exception {
if ( server instanceof RxNettyHttpServer ) {
// Caused by: java.io.IOException: Upgrade responses cannot have a transfer coding
// at org.xnio.http.HttpUpgrade$HttpUpgradeState.handleUpgrade(HttpUpgrade.java:490)
// at org.xnio.http.HttpUpgrade$HttpUpgradeState.access$1200(HttpUpgrade.java:165)
// at org.xnio.http.HttpUpgrade$HttpUpgradeState$UpgradeResultListener.handleEvent(HttpUpgrade.java:461)
// at org.xnio.http.HttpUpgrade$HttpUpgradeState$UpgradeResultListener.handleEvent(HttpUpgrade.java:400)
// at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
return ;
}
testSubProtocol ( new UndertowWebSocketClient ( ) ) ;
}
private void testSubProtocol ( WebSocketClient client ) throws URISyntaxException {
String protocol = "echo-v1" ;
AtomicReference < HandshakeInfo > infoRef = new AtomicReference < > ( ) ;
MonoProcessor < Object > output = MonoProcessor . create ( ) ;
@ -188,45 +112,10 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
@@ -188,45 +112,10 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
}
@Test
public void customHeaderReactorClient ( ) throws Exception {
testCustomHeader ( new ReactorNettyWebSocketClient ( ) ) ;
}
@Test
public void customHeaderRxNettyClient ( ) throws Exception {
testCustomHeader ( new RxNettyWebSocketClient ( ) ) ;
}
@Test
@Ignore
public void customHeaderJettyClient ( ) throws Exception {
JettyWebSocketClient client = new JettyWebSocketClient ( ) ;
client . start ( ) ;
testCustomHeader ( client ) ;
client . stop ( ) ;
}
@Test
public void customHeaderStandardClient ( ) throws Exception {
testCustomHeader ( new StandardWebSocketClient ( ) ) ;
}
@Test
public void customHeaderUndertowClient ( ) throws Exception {
if ( server instanceof RxNettyHttpServer ) {
// Caused by: java.io.IOException: Upgrade responses cannot have a transfer coding
// at org.xnio.http.HttpUpgrade$HttpUpgradeState.handleUpgrade(HttpUpgrade.java:490)
// at org.xnio.http.HttpUpgrade$HttpUpgradeState.access$1200(HttpUpgrade.java:165)
// at org.xnio.http.HttpUpgrade$HttpUpgradeState$UpgradeResultListener.handleEvent(HttpUpgrade.java:461)
// at org.xnio.http.HttpUpgrade$HttpUpgradeState$UpgradeResultListener.handleEvent(HttpUpgrade.java:400)
// at org.xnio.ChannelListeners.invokeChannelListener(ChannelListeners.java:92)
return ;
}
testCustomHeader ( new UndertowWebSocketClient ( ) ) ;
}
public void customHeader ( ) throws Exception {
private void testCustomHeader ( WebSocketClient client ) throws Exception {
// TODO
assumeFalse ( client instanceof JettyWebSocketClient ) ;
HttpHeaders headers = new HttpHeaders ( ) ;
headers . add ( "my-header" , "my-value" ) ;