Browse Source

Add WebSocketClient for Jetty

Issue: SPR-14527
pull/1026/merge
Violeta Georgieva 8 years ago committed by Rossen Stoyanchev
parent
commit
bd09a76a1e
  1. 27
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/HandshakeInfo.java
  2. 16
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java
  3. 171
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java
  4. 17
      spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/WebSocketIntegrationTests.java

27
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/HandshakeInfo.java

@ -37,13 +37,17 @@ public class HandshakeInfo { @@ -37,13 +37,17 @@ public class HandshakeInfo {
private final URI uri;
private final HttpHeaders headers;
private final Mono<Principal> principalMono;
private final Optional<String> protocol;
private HttpHeaders headers;
private Optional<String> protocol;
public HandshakeInfo(URI uri, Mono<Principal> principal) {
this(uri, new HttpHeaders(), principal, Optional.empty());
}
public HandshakeInfo(URI uri, HttpHeaders headers, Mono<Principal> principal,
Optional<String> subProtocol) {
@ -73,6 +77,15 @@ public class HandshakeInfo { @@ -73,6 +77,15 @@ public class HandshakeInfo {
return this.headers;
}
/**
* Sets the handshake HTTP headers. Those are the request headers for a
* server session and the response headers for a client session.
* @param headers the handshake HTTP headers.
*/
public void setHeaders(HttpHeaders headers) {
this.headers = headers;
}
/**
* Return the principal associated with the handshake HTTP request.
*/
@ -89,6 +102,14 @@ public class HandshakeInfo { @@ -89,6 +102,14 @@ public class HandshakeInfo {
return this.protocol;
}
/**
* Sets the sub-protocol negotiated at handshake time.
* @param protocol the sub-protocol negotiated at handshake time.
*/
public void setSubProtocol(Optional<String> protocol) {
this.protocol = protocol;
}
@Override
public String toString() {

16
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketHandlerAdapter.java

@ -30,6 +30,7 @@ import org.eclipse.jetty.websocket.api.extensions.Frame; @@ -30,6 +30,7 @@ import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.OpCode;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.MonoProcessor;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
@ -54,11 +55,20 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport @@ -54,11 +55,20 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
private JettyWebSocketSession delegateSession;
private final MonoProcessor<Void> completionMono;
public JettyWebSocketHandlerAdapter(WebSocketHandler delegate, HandshakeInfo info,
DataBufferFactory bufferFactory) {
this(delegate, info, bufferFactory, null);
}
public JettyWebSocketHandlerAdapter(WebSocketHandler delegate, HandshakeInfo info,
DataBufferFactory bufferFactory, MonoProcessor<Void> completionMono) {
super(delegate, info, bufferFactory);
this.completionMono = completionMono;
}
@ -145,6 +155,9 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport @@ -145,6 +155,9 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
@Override
public void onError(Throwable ex) {
if (completionMono != null) {
completionMono.onError(ex);
}
if (delegateSession != null) {
int code = CloseStatus.SERVER_ERROR.getCode();
delegateSession.close(new CloseStatus(code, ex.getMessage()));
@ -153,6 +166,9 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport @@ -153,6 +166,9 @@ public class JettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
@Override
public void onComplete() {
if (completionMono != null) {
completionMono.onComplete();
}
if (delegateSession != null) {
delegateSession.close();
}

171
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/JettyWebSocketClient.java

@ -0,0 +1,171 @@ @@ -0,0 +1,171 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.client;
import java.net.URI;
import java.util.Optional;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import org.springframework.context.Lifecycle;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.JettyWebSocketHandlerAdapter;
/**
* A Jetty based implementation of {@link WebSocketClient}.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class JettyWebSocketClient extends WebSocketClientSupport implements WebSocketClient, Lifecycle {
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
private final org.eclipse.jetty.websocket.client.WebSocketClient wsClient;
private final Object lifecycleMonitor = new Object();
/**
* Default constructor that creates an instance of
* {@link org.eclipse.jetty.websocket.client.WebSocketClient}.
*/
public JettyWebSocketClient() {
this(new org.eclipse.jetty.websocket.client.WebSocketClient());
}
/**
* Constructor that accepts an existing
* {@link org.eclipse.jetty.websocket.client.WebSocketClient} instance.
* @param wsClient a web socket client
*/
public JettyWebSocketClient(org.eclipse.jetty.websocket.client.WebSocketClient wsClient) {
this.wsClient = wsClient;
}
@Override
public Mono<Void> execute(URI url, WebSocketHandler handler) {
return execute(url, new HttpHeaders(), handler);
}
@Override
public Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler) {
return connectInternal(url, headers, handler);
}
private Mono<Void> connectInternal(URI url, HttpHeaders headers, WebSocketHandler handler) {
MonoProcessor<Void> processor = MonoProcessor.create();
return Mono.fromCallable(
() -> {
HandshakeInfo info = new HandshakeInfo(url, Mono.empty());
Object adapter = new JettyClientAdapter(handler, info, this.bufferFactory, processor);
ClientUpgradeRequest request = createRequest(url, headers, handler);
return this.wsClient.connect(adapter, url, request);
})
.then(processor);
}
private ClientUpgradeRequest createRequest(URI url, HttpHeaders headers, WebSocketHandler handler) {
ClientUpgradeRequest request = new ClientUpgradeRequest();
String[] protocols = beforeHandshake(url, headers, handler);
if (!ObjectUtils.isEmpty(protocols)) {
request.setSubProtocols(protocols);
}
headers.forEach((k, v) -> request.setHeader(k, v));
return request;
}
@Override
public void start() {
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
try {
this.wsClient.start();
}
catch (Exception ex) {
throw new IllegalStateException("Failed to start Jetty WebSocketClient", ex);
}
}
}
}
@Override
public void stop() {
synchronized (this.lifecycleMonitor) {
if (isRunning()) {
try {
this.wsClient.stop();
}
catch (Exception ex) {
throw new IllegalStateException("Error stopping Jetty WebSocketClient", ex);
}
}
}
}
@Override
public boolean isRunning() {
synchronized (this.lifecycleMonitor) {
return this.wsClient.isStarted();
}
}
@WebSocket
private static final class JettyClientAdapter extends JettyWebSocketHandlerAdapter {
public JettyClientAdapter(WebSocketHandler delegate,
HandshakeInfo info, DataBufferFactory bufferFactory, MonoProcessor<Void> processor) {
super(delegate, info, bufferFactory, processor);
}
@Override
public void onWebSocketConnect(Session session) {
UpgradeResponse response = session.getUpgradeResponse();
getHandshakeInfo().setHeaders(getResponseHeaders(response));
getHandshakeInfo().setSubProtocol(
Optional.ofNullable(response.getAcceptedSubProtocol()));
super.onWebSocketConnect(session);
}
private HttpHeaders getResponseHeaders(UpgradeResponse response) {
HttpHeaders responseHeaders = new HttpHeaders();
response.getHeaders().forEach((k, v) -> responseHeaders.put(k, v));
return responseHeaders;
}
}
}

17
spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/WebSocketIntegrationTests.java

@ -38,6 +38,7 @@ import org.springframework.web.reactive.socket.HandshakeInfo; @@ -38,6 +38,7 @@ import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.client.JettyWebSocketClient;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.RxNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;
@ -69,6 +70,14 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests @@ -69,6 +70,14 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
testEcho(new RxNettyWebSocketClient());
}
@Test
public void echoJettyClient() throws Exception {
JettyWebSocketClient client = new JettyWebSocketClient();
client.start();
testEcho(client);
client.stop();
}
private void testEcho(WebSocketClient client) throws URISyntaxException {
int count = 100;
Flux<String> input = Flux.range(1, count).map(index -> "msg-" + index);
@ -96,6 +105,14 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests @@ -96,6 +105,14 @@ public class WebSocketIntegrationTests extends AbstractWebSocketIntegrationTests
testSubProtocol(new RxNettyWebSocketClient());
}
@Test
public void subProtocolJettyClient() throws Exception {
JettyWebSocketClient client = new JettyWebSocketClient();
client.start();
testSubProtocol(client);
client.stop();
}
private void testSubProtocol(WebSocketClient client) throws URISyntaxException {
String protocol = "echo-v1";
AtomicReference<HandshakeInfo> infoRef = new AtomicReference<>();

Loading…
Cancel
Save