From 46b39f4372859bc28e15cc0856c93e51e37631b1 Mon Sep 17 00:00:00 2001 From: Violeta Georgieva Date: Thu, 8 Dec 2016 18:40:40 +0200 Subject: [PATCH] Initial reactive, WebSocket Tomcat support Issue: SPR-14527 --- build.gradle | 5 + .../TomcatWebSocketHandlerAdapter.java | 169 +++++++++++++ .../adapter/TomcatWebSocketSession.java | 234 ++++++++++++++++++ .../upgrade/ServerEndpointRegistration.java | 120 +++++++++ .../upgrade/TomcatRequestUpgradeStrategy.java | 94 +++++++ ...tractWebSocketHandlerIntegrationTests.java | 18 +- .../reactive/bootstrap/TomcatHttpServer.java | 10 + 7 files changed, 649 insertions(+), 1 deletion(-) create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ServerEndpointRegistration.java create mode 100644 spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java diff --git a/build.gradle b/build.gradle index 6bf30460a1..ef2425a84b 100644 --- a/build.gradle +++ b/build.gradle @@ -824,6 +824,11 @@ project("spring-web-reactive") { } optional("io.reactivex:rxjava:${rxjavaVersion}") optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}") + optional("javax.websocket:javax.websocket-api:${websocketVersion}") + optional("org.apache.tomcat:tomcat-websocket:${tomcatVersion}") { + exclude group: "org.apache.tomcat", module: "tomcat-websocket-api" + exclude group: "org.apache.tomcat", module: "tomcat-servlet-api" + } testCompile("io.projectreactor.addons:reactor-test:${reactorCoreVersion}") testCompile("javax.validation:validation-api:${beanvalVersion}") testCompile("org.hibernate:hibernate-validator:${hibval5Version}") diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java new file mode 100644 index 0000000000..e3066a5323 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketHandlerAdapter.java @@ -0,0 +1,169 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.adapter; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; + +import javax.websocket.CloseReason; +import javax.websocket.Endpoint; +import javax.websocket.EndpointConfig; +import javax.websocket.MessageHandler; +import javax.websocket.PongMessage; +import javax.websocket.Session; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.util.Assert; +import org.springframework.web.reactive.socket.CloseStatus; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.WebSocketMessage.Type; + +/** + * Tomcat {@code WebSocketHandler} implementation adapting and + * delegating to a Spring {@link WebSocketHandler}. + * + * @author Violeta Georgieva + * @since 5.0 + */ +public class TomcatWebSocketHandlerAdapter extends Endpoint { + + private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(false); + + private final WebSocketHandler handler; + + private TomcatWebSocketSession wsSession; + + public TomcatWebSocketHandlerAdapter(WebSocketHandler handler) { + Assert.notNull("'handler' is required"); + this.handler = handler; + } + + @Override + public void onOpen(Session session, EndpointConfig config) { + this.wsSession = new TomcatWebSocketSession(session); + + session.addMessageHandler(new MessageHandler.Whole() { + + @Override + public void onMessage(String message) { + while (true) { + if (wsSession.canWebSocketMessagePublisherAccept()) { + WebSocketMessage wsMessage = toMessage(message); + wsSession.handleMessage(wsMessage.getType(), wsMessage); + break; + } + } + } + + }); + session.addMessageHandler(new MessageHandler.Whole() { + + @Override + public void onMessage(ByteBuffer message) { + while (true) { + if (wsSession.canWebSocketMessagePublisherAccept()) { + WebSocketMessage wsMessage = toMessage(message); + wsSession.handleMessage(wsMessage.getType(), wsMessage); + break; + } + } + } + + }); + session.addMessageHandler(new MessageHandler.Whole() { + + @Override + public void onMessage(PongMessage message) { + while (true) { + if (wsSession.canWebSocketMessagePublisherAccept()) { + WebSocketMessage wsMessage = toMessage(message); + wsSession.handleMessage(wsMessage.getType(), wsMessage); + break; + } + } + } + + }); + + HandlerResultSubscriber resultSubscriber = new HandlerResultSubscriber(); + this.handler.handle(this.wsSession).subscribe(resultSubscriber); + } + + @Override + public void onClose(Session session, CloseReason reason) { + if (this.wsSession != null) { + this.wsSession.handleClose(reason); + } + } + + @Override + public void onError(Session session, Throwable exception) { + if (this.wsSession != null) { + this.wsSession.handleError(exception); + } + } + + private WebSocketMessage toMessage(T message) { + if (message instanceof String) { + return WebSocketMessage.create(Type.TEXT, + bufferFactory.wrap(((String) message).getBytes(StandardCharsets.UTF_8))); + } + else if (message instanceof ByteBuffer) { + return WebSocketMessage.create(Type.BINARY, + bufferFactory.wrap((ByteBuffer) message)); + } + else if (message instanceof PongMessage) { + return WebSocketMessage.create(Type.PONG, + bufferFactory.wrap(((PongMessage) message).getApplicationData())); + } + else { + throw new IllegalArgumentException("Unexpected message type: " + message); + } + } + + private final class HandlerResultSubscriber implements Subscriber { + + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Void aVoid) { + // no op + } + + @Override + public void onError(Throwable ex) { + if (wsSession != null) { + wsSession.close(new CloseStatus(CloseStatus.SERVER_ERROR.getCode(), ex.getMessage())); + } + } + + @Override + public void onComplete() { + if (wsSession != null) { + wsSession.close(); + } + } + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java new file mode 100644 index 0000000000..bc30d7c26b --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java @@ -0,0 +1,234 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.adapter; + +import java.io.IOException; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.websocket.CloseReason; +import javax.websocket.SendHandler; +import javax.websocket.SendResult; +import javax.websocket.Session; +import javax.websocket.CloseReason.CloseCodes; + +import org.reactivestreams.Publisher; +import org.springframework.http.server.reactive.AbstractRequestBodyPublisher; +import org.springframework.http.server.reactive.AbstractResponseBodyProcessor; +import org.springframework.web.reactive.socket.CloseStatus; +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.WebSocketSession; +import org.springframework.web.reactive.socket.WebSocketMessage.Type; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * Spring {@link WebSocketSession} adapter for Tomcat's + * {@link javax.websocket.Session}. + * + * @author Violeta Georgieva + * @since 5.0 + */ +public class TomcatWebSocketSession extends WebSocketSessionSupport { + + private final AtomicBoolean sendCalled = new AtomicBoolean(); + + private final WebSocketMessagePublisher webSocketMessagePublisher = + new WebSocketMessagePublisher(); + + private final String id; + + private final URI uri; + + private volatile WebSocketMessageProcessor webSocketMessageProcessor; + + public TomcatWebSocketSession(Session session) { + super(session); + this.id = session.getId(); + this.uri = session.getRequestURI(); + } + + @Override + public String getId() { + return this.id; + } + + @Override + public URI getUri() { + return this.uri; + } + + @Override + public Flux receive() { + return Flux.from(this.webSocketMessagePublisher); + } + + @Override + public Mono send(Publisher messages) { + if (this.sendCalled.compareAndSet(false, true)) { + this.webSocketMessageProcessor = new WebSocketMessageProcessor(); + return Mono.from(subscriber -> { + messages.subscribe(this.webSocketMessageProcessor); + this.webSocketMessageProcessor.subscribe(subscriber); + }); + } + else { + return Mono.error(new IllegalStateException("send() has already been called")); + } + } + + @Override + protected Mono closeInternal(CloseStatus status) { + try { + getDelegate().close(new CloseReason(CloseCodes.getCloseCode(status.getCode()), status.getReason())); + } + catch (IOException e) { + return Mono.error(e); + } + return Mono.empty(); + } + + boolean canWebSocketMessagePublisherAccept() { + return this.webSocketMessagePublisher.canAccept(); + } + + /** Handle a message callback from the Servlet container */ + void handleMessage(Type type, WebSocketMessage message) { + this.webSocketMessagePublisher.processWebSocketMessage(message); + } + + /** Handle a error callback from the Servlet container */ + void handleError(Throwable ex) { + this.webSocketMessagePublisher.onError(ex); + if (this.webSocketMessageProcessor != null) { + this.webSocketMessageProcessor.cancel(); + this.webSocketMessageProcessor.onError(ex); + } + } + + /** Handle a complete callback from the Servlet container */ + void handleClose(CloseReason reason) { + this.webSocketMessagePublisher.onAllDataRead(); + if (this.webSocketMessageProcessor != null) { + this.webSocketMessageProcessor.cancel(); + this.webSocketMessageProcessor.onComplete(); + } + } + + private static final class WebSocketMessagePublisher extends AbstractRequestBodyPublisher { + private volatile WebSocketMessage webSocketMessage; + + @Override + protected void checkOnDataAvailable() { + if (this.webSocketMessage != null) { + onDataAvailable(); + } + } + + @Override + protected WebSocketMessage read() throws IOException { + if (this.webSocketMessage != null) { + WebSocketMessage result = this.webSocketMessage; + this.webSocketMessage = null; + return result; + } + + return null; + } + + void processWebSocketMessage(WebSocketMessage webSocketMessage) { + this.webSocketMessage = webSocketMessage; + onDataAvailable(); + } + + boolean canAccept() { + return this.webSocketMessage == null; + } + } + + private final class WebSocketMessageProcessor extends AbstractResponseBodyProcessor { + private volatile boolean isReady = true; + + @Override + protected boolean write(WebSocketMessage message) throws IOException { + if (WebSocketMessage.Type.TEXT.equals(message.getType())) { + this.isReady = false; + getDelegate().getAsyncRemote().sendText( + new String(message.getPayload().asByteBuffer().array(), StandardCharsets.UTF_8), + new WebSocketMessageSendHandler()); + } + else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { + this.isReady = false; + getDelegate().getAsyncRemote().sendBinary(message.getPayload().asByteBuffer(), + new WebSocketMessageSendHandler()); + } + else if (WebSocketMessage.Type.PING.equals(message.getType())) { + getDelegate().getAsyncRemote().sendPing(message.getPayload().asByteBuffer()); + } + else if (WebSocketMessage.Type.PONG.equals(message.getType())) { + getDelegate().getAsyncRemote().sendPong(message.getPayload().asByteBuffer()); + } + else { + throw new IllegalArgumentException("Unexpected message type: " + message.getType()); + } + return true; + } + + @Override + protected void releaseData() { + if (logger.isTraceEnabled()) { + logger.trace("releaseBuffer: " + this.currentData); + } + this.currentData = null; + } + + @Override + protected boolean isDataEmpty(WebSocketMessage data) { + return data.getPayload().readableByteCount() == 0; + } + + @Override + protected boolean isWritePossible() { + if (this.isReady && this.currentData != null) { + return true; + } + else { + return false; + } + } + + private final class WebSocketMessageSendHandler implements SendHandler { + + @Override + public void onResult(SendResult result) { + if (result.isOK()) { + isReady = true; + webSocketMessageProcessor.onWritePossible(); + } + else { + webSocketMessageProcessor.cancel(); + webSocketMessageProcessor.onError(result.getException()); + } + } + + } + + } + +} diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ServerEndpointRegistration.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ServerEndpointRegistration.java new file mode 100644 index 0000000000..63c5868540 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/ServerEndpointRegistration.java @@ -0,0 +1,120 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.server.upgrade; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.websocket.Decoder; +import javax.websocket.Encoder; +import javax.websocket.Endpoint; +import javax.websocket.Extension; +import javax.websocket.server.ServerEndpointConfig; + +import org.springframework.util.Assert; + +/** + * An implementation of {@link javax.websocket.server.ServerEndpointConfig} for use in + * Spring applications. + * + *

Class constructor accept a singleton {@link javax.websocket.Endpoint} instance. + * + *

This class also extends + * {@link javax.websocket.server.ServerEndpointConfig.Configurator} to make it easier to + * override methods for customizing the handshake process. + * + * @author Violeta Georgieva + * @since 5.0 + */ +public class ServerEndpointRegistration extends ServerEndpointConfig.Configurator + implements ServerEndpointConfig { + + private final String path; + + private final Endpoint endpoint; + + /** + * Create a new {@link ServerEndpointRegistration} instance from an + * {@code javax.websocket.Endpoint} instance. + * @param path the endpoint path + * @param endpoint the endpoint instance + */ + public ServerEndpointRegistration(String path, Endpoint endpoint) { + Assert.hasText(path, "path must not be empty"); + Assert.notNull(endpoint, "endpoint must not be null"); + this.path = path; + this.endpoint = endpoint; + } + + @Override + public List> getEncoders() { + return new ArrayList<>(); + } + + @Override + public List> getDecoders() { + return new ArrayList<>(); + } + + @Override + public Map getUserProperties() { + return new HashMap<>(); + } + + @Override + public Class getEndpointClass() { + return this.endpoint.getClass(); + } + + public Endpoint getEndpoint() { + return this.endpoint; + } + + @Override + public String getPath() { + return this.path; + } + + @Override + public List getSubprotocols() { + return new ArrayList<>(); + } + + @Override + public List getExtensions() { + return new ArrayList<>(); + } + + @Override + public Configurator getConfigurator() { + return this; + } + + @SuppressWarnings("unchecked") + @Override + public T getEndpointInstance(Class endpointClass) + throws InstantiationException { + return (T) getEndpoint(); + } + + @Override + public String toString() { + return "ServerEndpointRegistration for path '" + getPath() + "': " + getEndpointClass(); + } +} \ No newline at end of file diff --git a/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java new file mode 100644 index 0000000000..899973892c --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/server/upgrade/TomcatRequestUpgradeStrategy.java @@ -0,0 +1,94 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.socket.server.upgrade; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.apache.tomcat.websocket.server.WsServerContainer; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.http.server.reactive.ServletServerHttpRequest; +import org.springframework.http.server.reactive.ServletServerHttpResponse; +import org.springframework.util.Assert; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.adapter.TomcatWebSocketHandlerAdapter; +import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy; +import org.springframework.web.server.ServerWebExchange; + +import reactor.core.publisher.Mono; + +/** + * A {@link RequestUpgradeStrategy} for use with Tomcat. + * + * @author Violeta Georgieva + * @since 5.0 + */ +public class TomcatRequestUpgradeStrategy implements RequestUpgradeStrategy { + + private static final String SERVER_CONTAINER_ATTR = "javax.websocket.server.ServerContainer"; + + + @Override + public Mono upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler){ + + TomcatWebSocketHandlerAdapter endpoint = + new TomcatWebSocketHandlerAdapter(webSocketHandler); + + HttpServletRequest servletRequest = getHttpServletRequest(exchange.getRequest()); + HttpServletResponse servletResponse = getHttpServletResponse(exchange.getResponse()); + + Map pathParams = Collections. emptyMap(); + + ServerEndpointRegistration sec = + new ServerEndpointRegistration(servletRequest.getRequestURI(), endpoint); + try { + getContainer(servletRequest).doUpgrade(servletRequest, servletResponse, + sec, pathParams); + } + catch (ServletException | IOException e) { + return Mono.error(e); + } + + return Mono.empty(); + } + + private WsServerContainer getContainer(HttpServletRequest request) { + ServletContext servletContext = request.getServletContext(); + Object container = servletContext.getAttribute(SERVER_CONTAINER_ATTR); + Assert.notNull(container, "No '" + SERVER_CONTAINER_ATTR + "' ServletContext attribute. " + + "Are you running in a Servlet container that supports JSR-356?"); + Assert.isTrue(container instanceof WsServerContainer); + return (WsServerContainer) container; + } + + private final HttpServletRequest getHttpServletRequest(ServerHttpRequest request) { + Assert.isTrue(request instanceof ServletServerHttpRequest); + return ((ServletServerHttpRequest) request).getServletRequest(); + } + + private final HttpServletResponse getHttpServletResponse(ServerHttpResponse response) { + Assert.isTrue(response instanceof ServletServerHttpResponse); + return ((ServletServerHttpResponse) response).getServletResponse(); + } +} diff --git a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java index 2489211ba0..de1d4d9b2a 100644 --- a/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java @@ -15,6 +15,9 @@ */ package org.springframework.web.reactive.socket.server; +import java.io.File; + +import org.apache.tomcat.websocket.server.WsContextListener; import org.junit.After; import org.junit.Before; import org.junit.runner.RunWith; @@ -29,12 +32,14 @@ import org.springframework.http.server.reactive.HttpHandler; import org.springframework.http.server.reactive.bootstrap.HttpServer; import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer; import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer; +import org.springframework.http.server.reactive.bootstrap.TomcatHttpServer; import org.springframework.util.SocketUtils; import org.springframework.web.reactive.DispatcherHandler; import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService; import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy; import org.springframework.web.reactive.socket.server.upgrade.RxNettyRequestUpgradeStrategy; +import org.springframework.web.reactive.socket.server.upgrade.TomcatRequestUpgradeStrategy; /** * Base class for WebSocket integration tests involving a server-side @@ -59,9 +64,11 @@ public abstract class AbstractWebSocketHandlerIntegrationTests { @Parameters public static Object[][] arguments() { + File base = new File(System.getProperty("java.io.tmpdir")); return new Object[][] { {new ReactorHttpServer(), ReactorNettyConfig.class}, - {new RxNettyHttpServer(), RxNettyConfig.class} + {new RxNettyHttpServer(), RxNettyConfig.class}, + {new TomcatHttpServer(base.getAbsolutePath(), WsContextListener.class), TomcatConfig.class} }; } @@ -134,4 +141,13 @@ public abstract class AbstractWebSocketHandlerIntegrationTests { } } + @Configuration + static class TomcatConfig extends AbstractHandlerAdapterConfig { + + @Override + protected RequestUpgradeStrategy getUpgradeStrategy() { + return new TomcatRequestUpgradeStrategy(); + } + } + } diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/TomcatHttpServer.java b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/TomcatHttpServer.java index 0c854e63bd..62337269f5 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/TomcatHttpServer.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/TomcatHttpServer.java @@ -37,6 +37,8 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I private String baseDir; + private Class wsListener; + public TomcatHttpServer() { } @@ -45,6 +47,11 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I this.baseDir = baseDir; } + public TomcatHttpServer(String baseDir, Class wsListener) { + this.baseDir = baseDir; + this.wsListener = wsListener; + } + @Override public void afterPropertiesSet() throws Exception { @@ -61,6 +68,9 @@ public class TomcatHttpServer extends HttpServerSupport implements HttpServer, I Context rootContext = tomcatServer.addContext("", base.getAbsolutePath()); Tomcat.addServlet(rootContext, "httpHandlerServlet", servlet); rootContext.addServletMappingDecoded("/", "httpHandlerServlet"); + if (wsListener != null) { + rootContext.addApplicationListener(wsListener.getName()); + } } private ServletHttpHandlerAdapter initServletHttpHandlerAdapter() {