Browse Source

Initial reactive, WebSocket Tomcat support

Issue: SPR-14527
pull/1256/merge
Violeta Georgieva 8 years ago committed by Rossen Stoyanchev
parent
commit
46b39f4372
  1. 5
      build.gradle
  2. 169
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java
  3. 234
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java
  4. 120
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ServerEndpointRegistration.java
  5. 94
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java
  6. 18
      spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java
  7. 10
      spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/TomcatHttpServer.java

5
build.gradle

@ -824,6 +824,11 @@ project("spring-web-reactive") { @@ -824,6 +824,11 @@ project("spring-web-reactive") {
}
optional("io.reactivex:rxjava:${rxjavaVersion}")
optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
optional("javax.websocket:javax.websocket-api:${websocketVersion}")
optional("org.apache.tomcat:tomcat-websocket:${tomcatVersion}") {
exclude group: "org.apache.tomcat", module: "tomcat-websocket-api"
exclude group: "org.apache.tomcat", module: "tomcat-servlet-api"
}
testCompile("io.projectreactor.addons:reactor-test:${reactorCoreVersion}")
testCompile("javax.validation:validation-api:${beanvalVersion}")
testCompile("org.hibernate:hibernate-validator:${hibval5Version}")

169
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java

@ -0,0 +1,169 @@ @@ -0,0 +1,169 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.PongMessage;
import javax.websocket.Session;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
/**
* Tomcat {@code WebSocketHandler} implementation adapting and
* delegating to a Spring {@link WebSocketHandler}.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class TomcatWebSocketHandlerAdapter extends Endpoint {
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(false);
private final WebSocketHandler handler;
private TomcatWebSocketSession wsSession;
public TomcatWebSocketHandlerAdapter(WebSocketHandler handler) {
Assert.notNull("'handler' is required");
this.handler = handler;
}
@Override
public void onOpen(Session session, EndpointConfig config) {
this.wsSession = new TomcatWebSocketSession(session);
session.addMessageHandler(new MessageHandler.Whole<String>() {
@Override
public void onMessage(String message) {
while (true) {
if (wsSession.canWebSocketMessagePublisherAccept()) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
break;
}
}
}
});
session.addMessageHandler(new MessageHandler.Whole<ByteBuffer>() {
@Override
public void onMessage(ByteBuffer message) {
while (true) {
if (wsSession.canWebSocketMessagePublisherAccept()) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
break;
}
}
}
});
session.addMessageHandler(new MessageHandler.Whole<PongMessage>() {
@Override
public void onMessage(PongMessage message) {
while (true) {
if (wsSession.canWebSocketMessagePublisherAccept()) {
WebSocketMessage wsMessage = toMessage(message);
wsSession.handleMessage(wsMessage.getType(), wsMessage);
break;
}
}
}
});
HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber();
this.handler.handle(this.wsSession).subscribe(resultSubscriber);
}
@Override
public void onClose(Session session, CloseReason reason) {
if (this.wsSession != null) {
this.wsSession.handleClose(reason);
}
}
@Override
public void onError(Session session, Throwable exception) {
if (this.wsSession != null) {
this.wsSession.handleError(exception);
}
}
private <T> WebSocketMessage toMessage(T message) {
if (message instanceof String) {
return WebSocketMessage.create(Type.TEXT,
bufferFactory.wrap(((String) message).getBytes(StandardCharsets.UTF_8)));
}
else if (message instanceof ByteBuffer) {
return WebSocketMessage.create(Type.BINARY,
bufferFactory.wrap((ByteBuffer) message));
}
else if (message instanceof PongMessage) {
return WebSocketMessage.create(Type.PONG,
bufferFactory.wrap(((PongMessage) message).getApplicationData()));
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message);
}
}
private final class HandlerResultSubscriber implements Subscriber<Void> {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Void aVoid) {
// no op
}
@Override
public void onError(Throwable ex) {
if (wsSession != null) {
wsSession.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage()));
}
}
@Override
public void onComplete() {
if (wsSession != null) {
wsSession.close();
}
}
}
}

234
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java

@ -0,0 +1,234 @@ @@ -0,0 +1,234 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.adapter;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.websocket.CloseReason;
import javax.websocket.SendHandler;
import javax.websocket.SendResult;
import javax.websocket.Session;
import javax.websocket.CloseReason.CloseCodes;
import org.reactivestreams.Publisher;
import org.springframework.http.server.reactive.AbstractRequestBodyPublisher;
import org.springframework.http.server.reactive.AbstractResponseBodyProcessor;
import org.springframework.web.reactive.socket.CloseStatus;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import org.springframework.web.reactive.socket.WebSocketMessage.Type;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Spring {@link WebSocketSession} adapter for Tomcat's
* {@link javax.websocket.Session}.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class TomcatWebSocketSession extends WebSocketSessionSupport<Session> {
private final AtomicBoolean sendCalled = new AtomicBoolean();
private final WebSocketMessagePublisher webSocketMessagePublisher =
new WebSocketMessagePublisher();
private final String id;
private final URI uri;
private volatile WebSocketMessageProcessor webSocketMessageProcessor;
public TomcatWebSocketSession(Session session) {
super(session);
this.id = session.getId();
this.uri = session.getRequestURI();
}
@Override
public String getId() {
return this.id;
}
@Override
public URI getUri() {
return this.uri;
}
@Override
public Flux<WebSocketMessage> receive() {
return Flux.from(this.webSocketMessagePublisher);
}
@Override
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
if (this.sendCalled.compareAndSet(false, true)) {
this.webSocketMessageProcessor = new WebSocketMessageProcessor();
return Mono.from(subscriber -> {
messages.subscribe(this.webSocketMessageProcessor);
this.webSocketMessageProcessor.subscribe(subscriber);
});
}
else {
return Mono.error(new IllegalStateException("send() has already been called"));
}
}
@Override
protected Mono<Void> closeInternal(CloseStatus status) {
try {
getDelegate().close(new CloseReason(CloseCodes.getCloseCode(status.getCode()), status.getReason()));
}
catch (IOException e) {
return Mono.error(e);
}
return Mono.empty();
}
boolean canWebSocketMessagePublisherAccept() {
return this.webSocketMessagePublisher.canAccept();
}
/** Handle a message callback from the Servlet container */
void handleMessage(Type type, WebSocketMessage message) {
this.webSocketMessagePublisher.processWebSocketMessage(message);
}
/** Handle a error callback from the Servlet container */
void handleError(Throwable ex) {
this.webSocketMessagePublisher.onError(ex);
if (this.webSocketMessageProcessor != null) {
this.webSocketMessageProcessor.cancel();
this.webSocketMessageProcessor.onError(ex);
}
}
/** Handle a complete callback from the Servlet container */
void handleClose(CloseReason reason) {
this.webSocketMessagePublisher.onAllDataRead();
if (this.webSocketMessageProcessor != null) {
this.webSocketMessageProcessor.cancel();
this.webSocketMessageProcessor.onComplete();
}
}
private static final class WebSocketMessagePublisher extends AbstractRequestBodyPublisher<WebSocketMessage> {
private volatile WebSocketMessage webSocketMessage;
@Override
protected void checkOnDataAvailable() {
if (this.webSocketMessage != null) {
onDataAvailable();
}
}
@Override
protected WebSocketMessage read() throws IOException {
if (this.webSocketMessage != null) {
WebSocketMessage result = this.webSocketMessage;
this.webSocketMessage = null;
return result;
}
return null;
}
void processWebSocketMessage(WebSocketMessage webSocketMessage) {
this.webSocketMessage = webSocketMessage;
onDataAvailable();
}
boolean canAccept() {
return this.webSocketMessage == null;
}
}
private final class WebSocketMessageProcessor extends AbstractResponseBodyProcessor<WebSocketMessage> {
private volatile boolean isReady = true;
@Override
protected boolean write(WebSocketMessage message) throws IOException {
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
this.isReady = false;
getDelegate().getAsyncRemote().sendText(
new String(message.getPayload().asByteBuffer().array(), StandardCharsets.UTF_8),
new WebSocketMessageSendHandler());
}
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
this.isReady = false;
getDelegate().getAsyncRemote().sendBinary(message.getPayload().asByteBuffer(),
new WebSocketMessageSendHandler());
}
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
getDelegate().getAsyncRemote().sendPing(message.getPayload().asByteBuffer());
}
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
getDelegate().getAsyncRemote().sendPong(message.getPayload().asByteBuffer());
}
else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
}
return true;
}
@Override
protected void releaseData() {
if (logger.isTraceEnabled()) {
logger.trace("releaseBuffer: " + this.currentData);
}
this.currentData = null;
}
@Override
protected boolean isDataEmpty(WebSocketMessage data) {
return data.getPayload().readableByteCount() == 0;
}
@Override
protected boolean isWritePossible() {
if (this.isReady && this.currentData != null) {
return true;
}
else {
return false;
}
}
private final class WebSocketMessageSendHandler implements SendHandler {
@Override
public void onResult(SendResult result) {
if (result.isOK()) {
isReady = true;
webSocketMessageProcessor.onWritePossible();
}
else {
webSocketMessageProcessor.cancel();
webSocketMessageProcessor.onError(result.getException());
}
}
}
}
}

120
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ServerEndpointRegistration.java

@ -0,0 +1,120 @@ @@ -0,0 +1,120 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.websocket.Decoder;
import javax.websocket.Encoder;
import javax.websocket.Endpoint;
import javax.websocket.Extension;
import javax.websocket.server.ServerEndpointConfig;
import org.springframework.util.Assert;
/**
* An implementation of {@link javax.websocket.server.ServerEndpointConfig} for use in
* Spring applications.
*
* <p>Class constructor accept a singleton {@link javax.websocket.Endpoint} instance.
*
* <p>This class also extends
* {@link javax.websocket.server.ServerEndpointConfig.Configurator} to make it easier to
* override methods for customizing the handshake process.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class ServerEndpointRegistration extends ServerEndpointConfig.Configurator
implements ServerEndpointConfig {
private final String path;
private final Endpoint endpoint;
/**
* Create a new {@link ServerEndpointRegistration} instance from an
* {@code javax.websocket.Endpoint} instance.
* @param path the endpoint path
* @param endpoint the endpoint instance
*/
public ServerEndpointRegistration(String path, Endpoint endpoint) {
Assert.hasText(path, "path must not be empty");
Assert.notNull(endpoint, "endpoint must not be null");
this.path = path;
this.endpoint = endpoint;
}
@Override
public List<Class<? extends Encoder>> getEncoders() {
return new ArrayList<>();
}
@Override
public List<Class<? extends Decoder>> getDecoders() {
return new ArrayList<>();
}
@Override
public Map<String, Object> getUserProperties() {
return new HashMap<>();
}
@Override
public Class<?> getEndpointClass() {
return this.endpoint.getClass();
}
public Endpoint getEndpoint() {
return this.endpoint;
}
@Override
public String getPath() {
return this.path;
}
@Override
public List<String> getSubprotocols() {
return new ArrayList<>();
}
@Override
public List<Extension> getExtensions() {
return new ArrayList<>();
}
@Override
public Configurator getConfigurator() {
return this;
}
@SuppressWarnings("unchecked")
@Override
public <T> T getEndpointInstance(Class<T> endpointClass)
throws InstantiationException {
return (T) getEndpoint();
}
@Override
public String toString() {
return "ServerEndpointRegistration for path '" + getPath() + "': " + getEndpointClass();
}
}

94
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java

@ -0,0 +1,94 @@ @@ -0,0 +1,94 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.reactive.socket.server.upgrade;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.tomcat.websocket.server.WsServerContainer;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServletServerHttpRequest;
import org.springframework.http.server.reactive.ServletServerHttpResponse;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.TomcatWebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
/**
* A {@link RequestUpgradeStrategy} for use with Tomcat.
*
* @author Violeta Georgieva
* @since 5.0
*/
public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy {
private static final String SERVER_CONTAINER_ATTR = "javax.websocket.server.ServerContainer";
@Override
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler){
TomcatWebSocketHandlerAdapter endpoint =
new TomcatWebSocketHandlerAdapter(webSocketHandler);
HttpServletRequest servletRequest = getHttpServletRequest(exchange.getRequest());
HttpServletResponse servletResponse = getHttpServletResponse(exchange.getResponse());
Map<String, String> pathParams = Collections.<String, String> emptyMap();
ServerEndpointRegistration sec =
new ServerEndpointRegistration(servletRequest.getRequestURI(), endpoint);
try {
getContainer(servletRequest).doUpgrade(servletRequest, servletResponse,
sec, pathParams);
}
catch (ServletException | IOException e) {
return Mono.error(e);
}
return Mono.empty();
}
private WsServerContainer getContainer(HttpServletRequest request) {
ServletContext servletContext = request.getServletContext();
Object container = servletContext.getAttribute(SERVER_CONTAINER_ATTR);
Assert.notNull(container, "No '" + SERVER_CONTAINER_ATTR + "' ServletContext attribute. " +
"Are you running in a Servlet container that supports JSR-356?");
Assert.isTrue(container instanceof WsServerContainer);
return (WsServerContainer) container;
}
private final HttpServletRequest getHttpServletRequest(ServerHttpRequest request) {
Assert.isTrue(request instanceof ServletServerHttpRequest);
return ((ServletServerHttpRequest) request).getServletRequest();
}
private final HttpServletResponse getHttpServletResponse(ServerHttpResponse response) {
Assert.isTrue(response instanceof ServletServerHttpResponse);
return ((ServletServerHttpResponse) response).getServletResponse();
}
}

18
spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java

@ -15,6 +15,9 @@ @@ -15,6 +15,9 @@
*/
package org.springframework.web.reactive.socket.server;
import java.io.File;
import org.apache.tomcat.websocket.server.WsContextListener;
import org.junit.After;
import org.junit.Before;
import org.junit.runner.RunWith;
@ -29,12 +32,14 @@ import org.springframework.http.server.reactive.HttpHandler; @@ -29,12 +32,14 @@ import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.bootstrap.HttpServer;
import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer;
import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer;
import org.springframework.http.server.reactive.bootstrap.TomcatHttpServer;
import org.springframework.util.SocketUtils;
import org.springframework.web.reactive.DispatcherHandler;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.RxNettyRequestUpgradeStrategy;
import org.springframework.web.reactive.socket.server.upgrade.TomcatRequestUpgradeStrategy;
/**
* Base class for WebSocket integration tests involving a server-side
@ -59,9 +64,11 @@ public abstract class AbstractWebSocketHandlerIntegrationTests { @@ -59,9 +64,11 @@ public abstract class AbstractWebSocketHandlerIntegrationTests {
@Parameters
public static Object[][] arguments() {
File base = new File(System.getProperty("java.io.tmpdir"));
return new Object[][] {
{new ReactorHttpServer(), ReactorNettyConfig.class},
{new RxNettyHttpServer(), RxNettyConfig.class}
{new RxNettyHttpServer(), RxNettyConfig.class},
{new TomcatHttpServer(base.getAbsolutePath(), WsContextListener.class), TomcatConfig.class}
};
}
@ -134,4 +141,13 @@ public abstract class AbstractWebSocketHandlerIntegrationTests { @@ -134,4 +141,13 @@ public abstract class AbstractWebSocketHandlerIntegrationTests {
}
}
@Configuration
static class TomcatConfig extends AbstractHandlerAdapterConfig {
@Override
protected RequestUpgradeStrategy getUpgradeStrategy() {
return new TomcatRequestUpgradeStrategy();
}
}
}

10
spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/TomcatHttpServer.java

@ -37,6 +37,8 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I @@ -37,6 +37,8 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I
private String baseDir;
private Class<?> wsListener;
public TomcatHttpServer() {
}
@ -45,6 +47,11 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I @@ -45,6 +47,11 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I
this.baseDir = baseDir;
}
public TomcatHttpServer(String baseDir, Class<?> wsListener) {
this.baseDir = baseDir;
this.wsListener = wsListener;
}
@Override
public void afterPropertiesSet() throws Exception {
@ -61,6 +68,9 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I @@ -61,6 +68,9 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I
Context rootContext = tomcatServer.addContext("", base.getAbsolutePath());
Tomcat.addServlet(rootContext, "httpHandlerServlet", servlet);
rootContext.addServletMappingDecoded("/", "httpHandlerServlet");
if (wsListener != null) {
rootContext.addApplicationListener(wsListener.getName());
}
}
private ServletHttpHandlerAdapter initServletHttpHandlerAdapter() {

Loading…
Cancel
Save