Browse Source

Refactor Undertow WebSocket client configuration model

This commit removes the statically created XnioWorker which is an
"active" component and should not be created automatically and could
lead to resource leaks. Instead XnioWorker is now required at
construction aligning better with WebSocketClient#connectionBuilder
which also does not have a "default" worker option.

Since the XnioWorker is the main input for creating a ConnectionBuilder
we now create the ConnectionBuider in a protected method and then allow
a Consumer<ConnectionBuilder> to configure it further as opposed to the
Function<URI, ConnectionBuilder> used previously.

This commit also removes default SSL context initialization for RxNetty
to better align with other client implementations.

Issue: SPR-14527
pull/1277/head
Rossen Stoyanchev 8 years ago
parent
commit
e4d39bb86f
  1. 19
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java
  2. 149
      spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java
  3. 14
      spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/AbstractWebSocketIntegrationTests.java

19
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/RxNettyWebSocketClient.java

@ -16,14 +16,11 @@ @@ -16,14 +16,11 @@
package org.springframework.web.reactive.socket.client;
import java.net.URI;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@ -89,20 +86,8 @@ public class RxNettyWebSocketClient extends WebSocketClientSupport implements We @@ -89,20 +86,8 @@ public class RxNettyWebSocketClient extends WebSocketClientSupport implements We
private static HttpClient<ByteBuf, ByteBuf> getDefaultHttpClientProvider(URI url) {
boolean secure = "wss".equals(url.getScheme());
int port = url.getPort() > 0 ? url.getPort() : secure ? 443 : 80;
HttpClient<ByteBuf, ByteBuf> client = HttpClient.newClient(url.getHost(), port);
if (secure) {
try {
SSLContext context = SSLContext.getDefault();
SSLEngine engine = context.createSSLEngine(url.getHost(), port);
engine.setUseClientMode(true);
client.secure(engine);
}
catch (NoSuchAlgorithmException ex) {
throw new IllegalStateException("Failed to create HttpClient for " + url, ex);
}
}
return client;
int port = (url.getPort() > 0 ? url.getPort() : secure ? 443 : 80);
return HttpClient.newClient(url.getHost(), port);
}

149
spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/client/UndertowWebSocketClient.java

@ -18,32 +18,25 @@ package org.springframework.web.reactive.socket.client; @@ -18,32 +18,25 @@ package org.springframework.web.reactive.socket.client;
import java.io.IOException;
import java.net.URI;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.net.ssl.SSLContext;
import java.util.function.Consumer;
import io.undertow.connector.ByteBufferPool;
import io.undertow.protocols.ssl.UndertowXnioSsl;
import io.undertow.server.DefaultByteBufferPool;
import io.undertow.websockets.client.WebSocketClient.ConnectionBuilder;
import io.undertow.websockets.client.WebSocketClientNegotiation;
import io.undertow.websockets.core.WebSocketChannel;
import org.xnio.IoFuture;
import org.xnio.OptionMap;
import org.xnio.Options;
import org.xnio.Xnio;
import org.xnio.XnioWorker;
import org.xnio.ssl.XnioSsl;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.util.Assert;
import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.adapter.UndertowWebSocketHandlerAdapter;
@ -58,68 +51,69 @@ import org.springframework.web.reactive.socket.adapter.UndertowWebSocketSession; @@ -58,68 +51,69 @@ import org.springframework.web.reactive.socket.adapter.UndertowWebSocketSession;
*/
public class UndertowWebSocketClient extends WebSocketClientSupport implements WebSocketClient {
private static final int DEFAULT_BUFFER_SIZE = 8192;
private static XnioWorker worker;
static {
try {
worker = Xnio.getInstance().createWorker(OptionMap.builder()
.set(Options.WORKER_IO_THREADS, 2)
.set(Options.CONNECTION_HIGH_WATER, 1000000)
.set(Options.CONNECTION_LOW_WATER, 1000000)
.set(Options.WORKER_TASK_CORE_THREADS, 30)
.set(Options.WORKER_TASK_MAX_THREADS, 30)
.set(Options.TCP_NODELAY, true)
.set(Options.CORK, true)
.getMap());
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
}
private static final int DEFAULT_POOL_BUFFER_SIZE = 8192;
private final XnioWorker worker;
private final Consumer<ConnectionBuilder> builderConsumer;
private final Function<URI, ConnectionBuilder> builder;
private int poolBufferSize = DEFAULT_POOL_BUFFER_SIZE;
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
/**
* Default constructor that uses
* {@link io.undertow.websockets.client.WebSocketClient#connectionBuilder(XnioWorker, ByteBufferPool, URI)}
* to create WebSocket connections.
* Constructor with the {@link XnioWorker} to pass to
* {@link io.undertow.websockets.client.WebSocketClient#connectionBuilder}
* @param worker the Xnio worker
*/
public UndertowWebSocketClient() {
this(UndertowWebSocketClient::createDefaultConnectionBuilder);
public UndertowWebSocketClient(XnioWorker worker) {
this(worker, builder -> {});
}
/**
* Constructor that accepts a {@link Function} to prepare a
* {@link ConnectionBuilder} for WebSocket connections.
* @param builder a connection builder that can be used to create a web socket connection.
* Alternate constructor providing additional control over the
* {@link ConnectionBuilder} for each WebSocket connection.
* @param worker the Xnio worker to use to create {@code ConnectionBuilder}'s
* @param builderConsumer a consumer to configure {@code ConnectionBuilder}'s
*/
public UndertowWebSocketClient(Function<URI, ConnectionBuilder> builder) {
this.builder = builder;
public UndertowWebSocketClient(XnioWorker worker, Consumer<ConnectionBuilder> builderConsumer) {
Assert.notNull(worker, "XnioWorker is required");
this.worker = worker;
this.builderConsumer = builderConsumer;
}
private static ConnectionBuilder createDefaultConnectionBuilder(URI url) {
ConnectionBuilder builder = io.undertow.websockets.client.WebSocketClient.connectionBuilder(
worker, new DefaultByteBufferPool(false, DEFAULT_BUFFER_SIZE), url);
/**
* Return the configured {@link XnioWorker}.
*/
public XnioWorker getXnioWorker() {
return this.worker;
}
boolean secure = "wss".equals(url.getScheme());
if (secure) {
try {
XnioSsl ssl = new UndertowXnioSsl(Xnio.getInstance(), OptionMap.EMPTY, SSLContext.getDefault());
builder.setSsl(ssl);
}
catch (NoSuchAlgorithmException ex) {
throw new RuntimeException("Failed to create Undertow ConnectionBuilder for " + url, ex);
}
}
/**
* Return the configured {@code Consumer<ConnectionBuilder}.
*/
public Consumer<ConnectionBuilder> getConnectionBuilderConsumer() {
return this.builderConsumer;
}
return builder;
/**
* Configure the size of the {@link io.undertow.connector.ByteBufferPool
* ByteBufferPool} to pass to
* {@link io.undertow.websockets.client.WebSocketClient#connectionBuilder}.
* <p>By default the buffer size is set to 8192.
*/
public void setPoolBufferSize(int poolBufferSize) {
this.poolBufferSize = poolBufferSize;
}
/**
* Return the size for Undertow's WebSocketClient {@code ByteBufferPool}.
*/
public int getPoolBufferSize() {
return this.poolBufferSize;
}
@ -137,13 +131,13 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W @@ -137,13 +131,13 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W
MonoProcessor<Void> completion = MonoProcessor.create();
return Mono.fromCallable(
() -> {
ConnectionBuilder builder = createConnectionBuilder(url);
String[] protocols = beforeHandshake(url, headers, handler);
DefaultNegotiation negotiation = new DefaultNegotiation(protocols, headers);
DefaultNegotiation negotiation = new DefaultNegotiation(protocols, headers, builder);
builder.setClientNegotiation(negotiation);
return this.builder.apply(url)
.setClientNegotiation(negotiation)
.connect()
.addNotifier(new IoFuture.HandlingNotifier<WebSocketChannel, Object>() {
return builder.connect().addNotifier(
new IoFuture.HandlingNotifier<WebSocketChannel, Object>() {
@Override
public void handleDone(WebSocketChannel channel, Object attachment) {
@ -159,6 +153,23 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W @@ -159,6 +153,23 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W
.then(completion);
}
/**
* Create a {@link ConnectionBuilder} for the given URI.
* <p>The default implementation creates a builder with the configured
* {@link #getXnioWorker() XnioWorker} and {@link #getPoolBufferSize()} and
* then passes it to the {@link #getConnectionBuilderConsumer() consumer}
* provided at construction time.
*/
protected ConnectionBuilder createConnectionBuilder(URI url) {
ConnectionBuilder builder = io.undertow.websockets.client.WebSocketClient
.connectionBuilder(getXnioWorker(),
new DefaultByteBufferPool(false, getPoolBufferSize()), url);
this.builderConsumer.accept(builder);
return builder;
}
private void handleChannel(URI url, WebSocketHandler handler, MonoProcessor<Void> completion,
DefaultNegotiation negotiation, WebSocketChannel channel) {
@ -177,15 +188,19 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W @@ -177,15 +188,19 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W
private final HttpHeaders requestHeaders;
private HttpHeaders responseHeaders = new HttpHeaders();
private final HttpHeaders responseHeaders = new HttpHeaders();
private final WebSocketClientNegotiation delegate;
public DefaultNegotiation(String[] subProtocols, HttpHeaders requestHeaders) {
super(Arrays.asList(subProtocols), Collections.emptyList());
public DefaultNegotiation(String[] protocols, HttpHeaders requestHeaders,
ConnectionBuilder connectionBuilder) {
super(Arrays.asList(protocols), Collections.emptyList());
this.requestHeaders = requestHeaders;
this.delegate = connectionBuilder.getClientNegotiation();
}
public HttpHeaders getResponseHeaders() {
return this.responseHeaders;
}
@ -193,11 +208,17 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W @@ -193,11 +208,17 @@ public class UndertowWebSocketClient extends WebSocketClientSupport implements W
@Override
public void beforeRequest(Map<String, List<String>> headers) {
this.requestHeaders.forEach(headers::put);
if (this.delegate != null) {
this.delegate.beforeRequest(headers);
}
}
@Override
public void afterRequest(Map<String, List<String>> headers) {
headers.forEach((k, v) -> this.responseHeaders.put(k, v));
headers.forEach(this.responseHeaders::put);
if (this.delegate != null) {
this.delegate.afterRequest(headers);
}
}
}

14
spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/AbstractWebSocketIntegrationTests.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
package org.springframework.web.reactive.socket;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@ -27,6 +28,8 @@ import org.junit.runner.RunWith; @@ -27,6 +28,8 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.xnio.OptionMap;
import org.xnio.Xnio;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuple3;
@ -72,6 +75,9 @@ import static org.junit.Assume.assumeFalse; @@ -72,6 +75,9 @@ import static org.junit.Assume.assumeFalse;
@SuppressWarnings({"unused", "WeakerAccess"})
public abstract class AbstractWebSocketIntegrationTests {
private static final File TMP_DIR = new File(System.getProperty("java.io.tmpdir"));
protected int port;
@Parameter(0)
@ -85,19 +91,17 @@ public abstract class AbstractWebSocketIntegrationTests { @@ -85,19 +91,17 @@ public abstract class AbstractWebSocketIntegrationTests {
@Parameters(name = "client[{0}] - server [{1}]")
public static Object[][] arguments() {
File base = new File(System.getProperty("java.io.tmpdir"));
public static Object[][] arguments() throws IOException {
Flux<? extends WebSocketClient> clients = Flux.concat(
Flux.just(new StandardWebSocketClient()).repeat(5),
Flux.just(new JettyWebSocketClient()).repeat(5),
Flux.just(new ReactorNettyWebSocketClient()).repeat(5),
Flux.just(new RxNettyWebSocketClient()).repeat(5),
Flux.just(new UndertowWebSocketClient()).repeat(5));
Flux.just(new UndertowWebSocketClient(Xnio.getInstance().createWorker(OptionMap.EMPTY))).repeat(5));
Flux<? extends HttpServer> servers = Flux.just(
new TomcatHttpServer(base.getAbsolutePath(), WsContextListener.class),
new TomcatHttpServer(TMP_DIR.getAbsolutePath(), WsContextListener.class),
new JettyHttpServer(),
new ReactorHttpServer(),
new RxNettyHttpServer(),

Loading…
Cancel
Save