diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java index 115c6abc73..14f008992b 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java @@ -17,6 +17,7 @@ package org.springframework.http.client.reactive; import java.net.URI; +import java.util.function.Consumer; import java.util.function.Function; import org.eclipse.jetty.client.HttpClient; @@ -24,11 +25,11 @@ import org.eclipse.jetty.util.Callback; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import org.springframework.context.SmartLifecycle; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.HttpMethod; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -43,7 +44,7 @@ import org.springframework.util.Assert; * @since 5.1 * @see Jetty ReactiveStreams HttpClient */ -public class JettyClientHttpConnector implements ClientHttpConnector, SmartLifecycle { +public class JettyClientHttpConnector implements ClientHttpConnector { private final HttpClient httpClient; @@ -57,6 +58,22 @@ public class JettyClientHttpConnector implements ClientHttpConnector, SmartLifec this(new HttpClient()); } + /** + * Constructor with an {@link JettyResourceFactory} that will manage shared resources. + * @param resourceFactory the {@link JettyResourceFactory} to use + * @param customizer the lambda used to customize the {@link HttpClient} + */ + public JettyClientHttpConnector(JettyResourceFactory resourceFactory, @Nullable Consumer customizer) { + HttpClient httpClient = new HttpClient(); + httpClient.setExecutor(resourceFactory.getExecutor()); + httpClient.setByteBufferPool(resourceFactory.getByteBufferPool()); + httpClient.setScheduler(resourceFactory.getScheduler()); + if (customizer != null) { + customizer.accept(httpClient); + } + this.httpClient = httpClient; + } + /** * Constructor with an initialized {@link HttpClient}. */ @@ -71,33 +88,6 @@ public class JettyClientHttpConnector implements ClientHttpConnector, SmartLifec } - @Override - public void start() { - try { - // HttpClient is internally synchronized and protected with state checks - this.httpClient.start(); - } - catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - @Override - public void stop() { - try { - this.httpClient.stop(); - } - catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - @Override - public boolean isRunning() { - return this.httpClient.isRunning(); - } - - @Override public Mono connect(HttpMethod method, URI uri, Function> requestCallback) { diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyResourceFactory.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyResourceFactory.java new file mode 100644 index 0000000000..1afbb1ba3a --- /dev/null +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyResourceFactory.java @@ -0,0 +1,156 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.http.client.reactive; + + +import java.nio.ByteBuffer; +import java.util.concurrent.Executor; + +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.MappedByteBufferPool; +import org.eclipse.jetty.util.ProcessorUtils; +import org.eclipse.jetty.util.component.ContainerLifeCycle; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler; +import org.eclipse.jetty.util.thread.Scheduler; +import org.eclipse.jetty.util.thread.ThreadPool; + +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +/** + * Factory to manage Jetty resources, i.e. {@link Executor}, {@link ByteBufferPool} and + * {@link Scheduler}, within the lifecycle of a Spring {@code ApplicationContext}. + * + *

This factory implements {@link InitializingBean} and {@link DisposableBean} + * and is expected typically to be declared as a Spring-managed bean. + * + * @author Sebastien Deleuze + * @since 5.1 + */ +public class JettyResourceFactory implements InitializingBean, DisposableBean { + + @Nullable + private Executor executor; + + @Nullable + private ByteBufferPool byteBufferPool; + + @Nullable + private Scheduler scheduler; + + private String threadPrefix = "jetty-http"; + + + /** + * Configure the {@link Executor} to use. + *

By default, initialized with a {@link QueuedThreadPool}. + * @param executor the executor to use + */ + public void setExecutor(@Nullable Executor executor) { + this.executor = executor; + } + + /** + * Configure the {@link ByteBufferPool} to use. + *

By default, initialized with a {@link MappedByteBufferPool}. + * @param byteBufferPool the {@link ByteBuffer} pool to use + */ + public void setByteBufferPool(@Nullable ByteBufferPool byteBufferPool) { + this.byteBufferPool = byteBufferPool; + } + + /** + * Configure the {@link Scheduler} to use. + *

By default, initialized with a {@link ScheduledExecutorScheduler}. + * @param scheduler the {@link Scheduler} to use + */ + public void setScheduler(@Nullable Scheduler scheduler) { + this.scheduler = scheduler; + } + + /** + * Configure the thread prefix to initialize {@link QueuedThreadPool} executor with. This + * is used only when a {@link Executor} instance isn't + * {@link #setExecutor(Executor) provided}. + *

By default set to "jetty-http". + * @param threadPrefix the thread prefix to use + */ + public void setThreadPrefix(String threadPrefix) { + Assert.notNull(threadPrefix, "Thread prefix is required"); + this.threadPrefix = threadPrefix; + } + + /** + * Return the configured {@link Executor}. + */ + @Nullable + public Executor getExecutor() { + return this.executor; + } + + /** + * Return the configured {@link ByteBufferPool}. + */ + @Nullable + public ByteBufferPool getByteBufferPool() { + return this.byteBufferPool; + } + + /** + * Return the configured {@link Scheduler}. + */ + @Nullable + public Scheduler getScheduler() { + return this.scheduler; + } + + @Override + public void afterPropertiesSet() throws Exception { + String name = this.threadPrefix + "@" + Integer.toHexString(hashCode()); + if (this.executor == null) { + QueuedThreadPool threadPool = new QueuedThreadPool(); + threadPool.setName(name); + threadPool.start(); + this.executor = threadPool; + } + if (this.byteBufferPool == null) { + this.byteBufferPool = new MappedByteBufferPool(2048, + this.executor instanceof ThreadPool.SizedThreadPool + ? ((ThreadPool.SizedThreadPool) executor).getMaxThreads() / 2 + : ProcessorUtils.availableProcessors() * 2); + } + if (this.scheduler == null) { + Scheduler scheduler = new ScheduledExecutorScheduler(name + "-scheduler", false); + scheduler.start(); + this.scheduler = scheduler; + } + } + + @Override + public void destroy() throws Exception { + if (this.executor instanceof ContainerLifeCycle) { + ((ContainerLifeCycle)this.executor).stop(); + } + if (this.scheduler != null) { + this.scheduler.stop(); + } + } + +} diff --git a/src/docs/asciidoc/web/webflux-webclient.adoc b/src/docs/asciidoc/web/webflux-webclient.adoc index cd532b4338..46ed9dc9a0 100644 --- a/src/docs/asciidoc/web/webflux-webclient.adoc +++ b/src/docs/asciidoc/web/webflux-webclient.adoc @@ -138,6 +138,54 @@ instances use shared resources: <3> Plug the connector into the `WebClient.Builder`. +[[webflux-client-builder-jetty]] +=== Jetty + +To customize Jetty `HttpClient` settings: + +[source,java,intent=0] +[subs="verbatim,quotes"] +---- + HttpClient httpClient = new HttpClient(); + httpClient.setCookieStore(...); + ClientHttpConnector connector = new JettyClientHttpConnector(httpClient); + + WebClient webClient = WebClient.builder().clientConnector(connector).build(); +---- + +By default `HttpClient` creates its own resources (`Executor`, `ByteBufferPool`, `Scheduler`) +which remain active until the process exits or `stop()` is called. + +You can share resources between multiple intances of Jetty client (and server) and ensure the +resources are shut down when the Spring `ApplicationContext` is closed by declaring a +Spring-managed bean of type `JettyResourceFactory`: + +[source,java,intent=0] +[subs="verbatim,quotes"] +---- + @Bean + public JettyResourceFactory resourceFactory() { + return new JettyResourceFactory(); + } + + @Bean + public WebClient webClient() { + + Consumer customizer = client -> { + // Further customizations... + }; + + ClientHttpConnector connector = + new JettyClientHttpConnector(resourceFactory(), customizer); // <2> + + return WebClient.builder().clientConnector(connector).build(); // <3> + } +---- + +<1> Create shared resources. +<2> Use `JettyClientHttpConnector` constructor with resource factory. +<3> Plug the connector into the `WebClient.Builder`. + [[webflux-client-retrieve]]