Browse Source

JDK HttpClient connector for WebClient

See gh-21014
pull/27722/head
Julien Eyraud 5 years ago committed by Rossen Stoyanchev
parent
commit
d930617442
  1. 64
      spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpConnector.java
  2. 157
      spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java
  3. 107
      spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java
  4. 4
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java
  5. 7
      spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java
  6. 50
      src/docs/asciidoc/web/webflux-webclient.adoc

64
spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpConnector.java

@ -0,0 +1,64 @@ @@ -0,0 +1,64 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.URI;
import java.net.http.HttpClient;
import java.util.function.Function;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpMethod;
/**
* {@link ClientHttpConnector} for the Java 11 HTTP client.
*
* @author Julien Eyraud
* @since 5.2
* @see <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.net.http/java/net/http/HttpClient.html">Java HttpClient</a>
*/
public class JdkClientHttpConnector implements ClientHttpConnector {
private final HttpClient httpClient;
private final DataBufferFactory dataBufferFactory;
/**
* Default constructor that creates a new instance of {@link HttpClient} and a {@link DataBufferFactory}.
*/
public JdkClientHttpConnector() {
this(HttpClient.newHttpClient(), new DefaultDataBufferFactory());
}
/**
* Constructor with an initialized {@link HttpClient} and a initialized {@link DataBufferFactory}.
*/
public JdkClientHttpConnector(final HttpClient httpClient, final DataBufferFactory dataBufferFactory) {
this.httpClient = httpClient;
this.dataBufferFactory = dataBufferFactory;
}
@Override
public Mono<ClientHttpResponse> connect(final HttpMethod method, final URI uri, final Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
final JdkClientHttpRequest request = new JdkClientHttpRequest(this.httpClient, method, uri, this.dataBufferFactory);
return requestCallback.apply(request).then(Mono.defer(request::getResponse));
}
}

157
spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java

@ -0,0 +1,157 @@ @@ -0,0 +1,157 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Flow;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import reactor.adapter.JdkFlowAdapter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.Assert;
/**
* {@link ClientHttpRequest} implementation for the Java 11 HTTP client.
*
* @author Julien Eyraud
* @since 5.2
* @see <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.net.http/java/net/http/HttpClient.html">Java HttpClient</a>
*/
class JdkClientHttpRequest extends AbstractClientHttpRequest {
private static final Set<String> DISALLOWED_HEADERS = Set.of("connection", "content-length", "date", "expect", "from", "host", "upgrade", "via", "warning");
private final HttpClient httpClient;
private final HttpMethod method;
private final URI uri;
private final HttpRequest.Builder builder;
private final DataBufferFactory bufferFactory;
private Mono<ClientHttpResponse> response;
public JdkClientHttpRequest(final HttpClient httpClient, final HttpMethod httpMethod, final URI uri, final DataBufferFactory bufferFactory) {
Assert.notNull(httpClient, "HttpClient should not be null");
Assert.notNull(httpMethod, "HttpMethod should not be null");
Assert.notNull(uri, "URI should not be null");
Assert.notNull(bufferFactory, "DataBufferFactory should not be null");
this.httpClient = httpClient;
this.method = httpMethod;
this.uri = uri;
this.builder = HttpRequest.newBuilder(uri);
this.bufferFactory = bufferFactory;
}
@Override
protected void applyHeaders() {
HttpHeaders headers = getHeaders();
for (Map.Entry<String, List<String>> header : getHeaders().entrySet()) {
if (!DISALLOWED_HEADERS.contains(header.getKey().toLowerCase())) {
for (String value : header.getValue()) {
this.builder.header(header.getKey(), value);
}
}
}
if (!headers.containsKey(HttpHeaders.ACCEPT)) {
this.builder.header(HttpHeaders.ACCEPT, "*/*");
}
}
@Override
protected void applyCookies() {
final String cookies = getCookies().values().stream().flatMap(List::stream).map(c -> c.getName() + "=" + c.getValue()).collect(Collectors.joining("; "));
this.builder.header(HttpHeaders.COOKIE, cookies);
}
@Override
public HttpMethod getMethod() {
return this.method;
}
@Override
public URI getURI() {
return this.uri;
}
@Override
public DataBufferFactory bufferFactory() {
return this.bufferFactory;
}
@Override
@SuppressWarnings("unchecked")
public <T> T getNativeRequest() {
return (T) this.builder.build();
}
@Override
public Mono<Void> writeWith(final Publisher<? extends DataBuffer> body) {
return doCommit(() -> {
final Flow.Publisher<ByteBuffer> flowAdapter = JdkFlowAdapter.publisherToFlowPublisher(Flux.from(body).map(DataBuffer::asByteBuffer));
final long contentLength = getHeaders().getContentLength();
final HttpRequest.BodyPublisher bodyPublisher = contentLength >= 0 ? HttpRequest.BodyPublishers.fromPublisher(flowAdapter, contentLength)
: HttpRequest.BodyPublishers.fromPublisher(flowAdapter);
this.response = Mono
.fromCompletionStage(() -> this.httpClient.sendAsync(this.builder.method(this.method.name(), bodyPublisher).build(), HttpResponse.BodyHandlers.ofPublisher()))
.map(r -> new JdkClientHttpResponse(r, this.bufferFactory));
return Mono.empty();
});
}
@Override
public Mono<Void> writeAndFlushWith(final Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMap(Function.identity()));
}
@Override
public Mono<Void> setComplete() {
if (isCommitted()) {
return Mono.empty();
}
else {
return doCommit(() -> {
this.response = Mono
.fromCompletionStage(() -> this.httpClient.sendAsync(this.builder.method(this.method.name(), HttpRequest.BodyPublishers.noBody()).build(), HttpResponse.BodyHandlers.ofPublisher()))
.map(r -> new JdkClientHttpResponse(r, this.bufferFactory));
return Mono.empty();
});
}
}
public Mono<ClientHttpResponse> getResponse() {
return this.response;
}
}

107
spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java

@ -0,0 +1,107 @@ @@ -0,0 +1,107 @@
/*
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.http.client.reactive;
import java.net.HttpCookie;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import reactor.adapter.JdkFlowAdapter;
import reactor.core.publisher.Flux;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.lang.Nullable;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* {@link ClientHttpResponse} implementation for the Java 11 HTTP client.
*
* @author Julien Eyraud
* @since 5.2
* @see <a href="https://docs.oracle.com/en/java/javase/11/docs/api/java.net.http/java/net/http/HttpClient.html">Java HttpClient</a>
*/
class JdkClientHttpResponse implements ClientHttpResponse {
private static final Pattern SAMESITE_PATTERN = Pattern.compile("(?i).*SameSite=(Strict|Lax|None).*");
private final HttpResponse<Flow.Publisher<List<ByteBuffer>>> response;
private final DataBufferFactory bufferFactory;
public JdkClientHttpResponse(final HttpResponse<Flow.Publisher<List<ByteBuffer>>> response, final DataBufferFactory bufferFactory) {
this.response = response;
this.bufferFactory = bufferFactory;
}
@Nullable
private static String parseSameSite(String headerValue) {
Matcher matcher = SAMESITE_PATTERN.matcher(headerValue);
return (matcher.matches() ? matcher.group(1) : null);
}
@Override
public HttpStatus getStatusCode() {
return HttpStatus.resolve(this.response.statusCode());
}
@Override
public int getRawStatusCode() {
return this.response.statusCode();
}
@Override
public MultiValueMap<String, ResponseCookie> getCookies() {
return this.response.headers().allValues(HttpHeaders.SET_COOKIE).stream()
.flatMap(header ->
HttpCookie.parse(header).stream().map(httpCookie ->
ResponseCookie
.from(httpCookie.getName(), httpCookie.getValue())
.domain(httpCookie.getDomain())
.httpOnly(httpCookie.isHttpOnly())
.maxAge(httpCookie.getMaxAge())
.path(httpCookie.getPath())
.secure(httpCookie.getSecure())
.sameSite(parseSameSite(header))
.build()
)
).collect(LinkedMultiValueMap::new, (m, v) -> m.add(v.getName(), v), LinkedMultiValueMap::addAll);
}
@Override
public Flux<DataBuffer> getBody() {
return JdkFlowAdapter
.flowPublisherToFlux(this.response.body())
.flatMap(Flux::fromIterable)
.map(this.bufferFactory::wrap)
.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
}
@Override
public HttpHeaders getHeaders() {
return this.response.headers().map().entrySet().stream().collect(HttpHeaders::new, (headers, entry) -> headers.addAll(entry.getKey(), entry.getValue()), HttpHeaders::addAll);
}
}

4
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java

@ -67,6 +67,7 @@ import org.springframework.http.ResponseCookie; @@ -67,6 +67,7 @@ import org.springframework.http.ResponseCookie;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.HttpComponentsClientHttpConnector;
import org.springframework.http.client.reactive.JdkClientHttpConnector;
import org.springframework.http.client.reactive.JettyClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.util.SocketUtils;
@ -99,7 +100,8 @@ class WebClientIntegrationTests { @@ -99,7 +100,8 @@ class WebClientIntegrationTests {
return Stream.of(
new ReactorClientHttpConnector(),
new JettyClientHttpConnector(),
new HttpComponentsClientHttpConnector()
new HttpComponentsClientHttpConnector(),
new JdkClientHttpConnector()
);
}

7
spring-webflux/src/test/java/org/springframework/web/reactive/result/method/annotation/SseIntegrationTests.java

@ -36,6 +36,7 @@ import org.springframework.context.annotation.Configuration; @@ -36,6 +36,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.HttpComponentsClientHttpConnector;
import org.springframework.http.client.reactive.JdkClientHttpConnector;
import org.springframework.http.client.reactive.JettyClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.http.codec.ServerSentEvent;
@ -76,15 +77,19 @@ class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests { @@ -76,15 +77,19 @@ class SseIntegrationTests extends AbstractHttpHandlerIntegrationTests {
{new JettyHttpServer(), new ReactorClientHttpConnector()},
{new JettyHttpServer(), new JettyClientHttpConnector()},
{new JettyHttpServer(), new HttpComponentsClientHttpConnector()},
{new JettyHttpServer(), new JdkClientHttpConnector()},
{new ReactorHttpServer(), new ReactorClientHttpConnector()},
{new ReactorHttpServer(), new JettyClientHttpConnector()},
{new ReactorHttpServer(), new HttpComponentsClientHttpConnector()},
{new ReactorHttpServer(), new JdkClientHttpConnector()},
{new TomcatHttpServer(), new ReactorClientHttpConnector()},
{new TomcatHttpServer(), new JettyClientHttpConnector()},
{new TomcatHttpServer(), new HttpComponentsClientHttpConnector()},
{new TomcatHttpServer(), new JdkClientHttpConnector()},
{new UndertowHttpServer(), new ReactorClientHttpConnector()},
{new UndertowHttpServer(), new JettyClientHttpConnector()},
{new UndertowHttpServer(), new HttpComponentsClientHttpConnector()}
{new UndertowHttpServer(), new HttpComponentsClientHttpConnector()},
{new UndertowHttpServer(), new JdkClientHttpConnector()},
};
}

50
src/docs/asciidoc/web/webflux-webclient.adoc

@ -14,6 +14,7 @@ support for the following: @@ -14,6 +14,7 @@ support for the following:
* https://github.com/reactor/reactor-netty[Reactor Netty]
* https://github.com/jetty-project/jetty-reactive-httpclient[Jetty Reactive HttpClient]
* https://hc.apache.org/index.html[Apache HttpComponents]
* https://docs.oracle.com/en/java/javase/11/docs/api/java.net.http/java/net/http/HttpClient.html[JDK HttpClient]
* Others can be plugged via `ClientHttpConnector`.
@ -382,7 +383,7 @@ shows: @@ -382,7 +383,7 @@ shows:
HttpClient httpClient = new HttpClient();
// Further customizations...
ClientHttpConnector connector =
new JettyClientHttpConnector(httpClient, resourceFactory()); <1>
@ -403,7 +404,7 @@ shows: @@ -403,7 +404,7 @@ shows:
val httpClient = HttpClient()
// Further customizations...
val connector = JettyClientHttpConnector(httpClient, resourceFactory()) // <1>
return WebClient.builder().clientConnector(connector).build() // <2>
@ -439,6 +440,43 @@ The following example shows how to customize Apache HttpComponents `HttpClient` @@ -439,6 +440,43 @@ The following example shows how to customize Apache HttpComponents `HttpClient`
val webClient = WebClient.builder().clientConnector(connector).build()
----
[[webflux-client-builder-jdk-httpclient]]
=== JDK
The following example shows how to customize JDK `HttpClient` settings:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
@Bean
public WebClient webClient() {
HttpClient httpClient = HttpClient.newBuilder()
.executor(...)
.connectTimeout(...)
.build();
ClientHttpConnector connector =
new JdkClientHttpConnector(httpClient, new DefaultDataBufferFactory()); // <1>
return WebClient webClient = WebClient.builder().clientConnector(connector).build(); // <2>
}
----
<1> Use the `JdkClientHttpConnector` constructor with customized `HttpClient` instance.
<2> Plug the connector into the `WebClient.Builder`.
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
.Kotlin
----
@Bean
fun webClient() : WebClient {
val httpClient = HttpClient.newBuilder()
.executor(...)
.connectTimeout(...)
.build()
val connector = JdkClientHttpConnector(httpClient, DefaultDataBufferFactory()) // <1>
return WebClient webClient = WebClient.builder()
.clientConnector(connector).build() // <2>
}
----
[[webflux-client-retrieve]]
== `retrieve()`
@ -761,9 +799,9 @@ multipart request. The following example shows how to create a `MultiValueMap<St @@ -761,9 +799,9 @@ multipart request. The following example shows how to create a `MultiValueMap<St
part("fieldPart", "fieldValue")
part("filePart1", new FileSystemResource("...logo.png"))
part("jsonPart", new Person("Jason"))
part("myPart", part) // Part from a server request
part("myPart", part) // Part from a server request
}
val parts = builder.build()
----
@ -1063,7 +1101,7 @@ apply to all operations. For example: @@ -1063,7 +1101,7 @@ apply to all operations. For example:
client.get().uri("/person/{id}", i).retrieve()
.awaitBody<Person>()
}
val persons = runBlocking {
client.get().uri("/persons").retrieve()
.bodyToFlow<Person>()
@ -1118,7 +1156,7 @@ inter-dependent, without ever blocking until the end. @@ -1118,7 +1156,7 @@ inter-dependent, without ever blocking until the end.
With `Flux` or `Mono`, you should never have to block in a Spring MVC or Spring WebFlux controller.
Simply return the resulting reactive type from the controller method. The same principle apply to
Kotlin Coroutines and Spring WebFlux, just use suspending function or return `Flow` in your
controller method .
controller method .
====

Loading…
Cancel
Save