From b3b50f8f4bad43ffca0e17e570bf5ed2f5a8bd3a Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 19 Nov 2021 07:28:59 +0000 Subject: [PATCH] Refactoring in the JDK HttpClient support See gh-23432 --- .../reactive/JdkClientHttpConnector.java | 22 ++++- .../client/reactive/JdkClientHttpRequest.java | 89 ++++++------------- .../reactive/JdkClientHttpResponse.java | 53 ++++++----- 3 files changed, 81 insertions(+), 83 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpConnector.java index 3fcf281948..fd30f8b8b6 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpConnector.java @@ -18,6 +18,12 @@ package org.springframework.http.client.reactive; import java.net.URI; import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Flow; import java.util.function.Function; import reactor.core.publisher.Mono; @@ -27,9 +33,10 @@ import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.HttpMethod; /** - * {@link ClientHttpConnector} for Java's {@link HttpClient}. + * {@link ClientHttpConnector} for the Java {@link HttpClient}. * * @author Julien Eyraud + * @author Rossen Stoyanchev * @since 6.0 * @see HttpClient */ @@ -60,8 +67,17 @@ public class JdkClientHttpConnector implements ClientHttpConnector { public Mono connect( HttpMethod method, URI uri, Function> requestCallback) { - JdkClientHttpRequest request = new JdkClientHttpRequest(this.httpClient, method, uri, this.bufferFactory); - return requestCallback.apply(request).then(Mono.defer(request::getResponse)); + JdkClientHttpRequest jdkClientHttpRequest = new JdkClientHttpRequest(method, uri, this.bufferFactory); + + return requestCallback.apply(jdkClientHttpRequest).then(Mono.defer(() -> { + HttpRequest httpRequest = jdkClientHttpRequest.getNativeRequest(); + + CompletableFuture>>> future = + this.httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofPublisher()); + + return Mono.fromCompletionStage(future) + .map(response -> new JdkClientHttpResponse(response, this.bufferFactory)); + })); } } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java index f514915230..3aed6a3d9d 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpRequest.java @@ -19,11 +19,9 @@ package org.springframework.http.client.reactive; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; -import java.net.http.HttpResponse; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.Flow; import java.util.function.Function; import java.util.stream.Collectors; @@ -35,50 +33,38 @@ import reactor.core.publisher.Mono; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.http.HttpCookie; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; -import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** - * {@link ClientHttpRequest} implementation for Java's {@link HttpClient}. + * {@link ClientHttpRequest} for the Java {@link HttpClient}. * * @author Julien Eyraud + * @author Rossen Stoyanchev * @since 6.0 */ class JdkClientHttpRequest extends AbstractClientHttpRequest { - private static final Set DISALLOWED_HEADERS = - Set.of("connection", "content-length", "date", "expect", "from", "host", "upgrade", "via", "warning"); - - - private final HttpClient httpClient; - private final HttpMethod method; private final URI uri; - private final HttpRequest.Builder builder; - private final DataBufferFactory bufferFactory; - @Nullable - private Mono response; - + private final HttpRequest.Builder builder; - public JdkClientHttpRequest( - HttpClient httpClient, HttpMethod httpMethod, URI uri, DataBufferFactory bufferFactory) { - Assert.notNull(httpClient, "HttpClient should not be null"); - Assert.notNull(httpMethod, "HttpMethod should not be null"); - Assert.notNull(uri, "URI should not be null"); - Assert.notNull(bufferFactory, "DataBufferFactory should not be null"); + public JdkClientHttpRequest(HttpMethod httpMethod, URI uri, DataBufferFactory bufferFactory) { + Assert.notNull(httpMethod, "HttpMethod is required"); + Assert.notNull(uri, "URI is required"); + Assert.notNull(bufferFactory, "DataBufferFactory is required"); - this.httpClient = httpClient; this.method = httpMethod; this.uri = uri; - this.builder = HttpRequest.newBuilder(uri); this.bufferFactory = bufferFactory; + this.builder = HttpRequest.newBuilder(uri); } @@ -103,20 +89,16 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest { return (T) this.builder.build(); } - Mono getResponse() { - Assert.notNull(this.response, "Response is not set"); - return this.response; - } - @Override protected void applyHeaders() { - for (Map.Entry> header : getHeaders().entrySet()) { - if (DISALLOWED_HEADERS.contains(header.getKey().toLowerCase())) { + for (Map.Entry> entry : getHeaders().entrySet()) { + if (entry.getKey().equalsIgnoreCase(HttpHeaders.CONTENT_LENGTH)) { + // content-length is specified when writing continue; } - for (String value : header.getValue()) { - this.builder.header(header.getKey(), value); + for (String value : entry.getValue()) { + this.builder.header(entry.getKey(), value); } } if (!getHeaders().containsKey(HttpHeaders.ACCEPT)) { @@ -126,31 +108,28 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest { @Override protected void applyCookies() { - this.builder.header(HttpHeaders.COOKIE, - getCookies().values().stream() - .flatMap(List::stream) - .map(cookie -> cookie.getName() + "=" + cookie.getValue()) - .collect(Collectors.joining("; "))); + this.builder.header(HttpHeaders.COOKIE, getCookies().values().stream() + .flatMap(List::stream).map(HttpCookie::toString).collect(Collectors.joining(";"))); } @Override public Mono writeWith(Publisher body) { return doCommit(() -> { - Flow.Publisher flow = - JdkFlowAdapter.publisherToFlowPublisher(Flux.from(body).map(DataBuffer::asByteBuffer)); + this.builder.method(this.method.name(), toBodyPublisher(body)); + return Mono.empty(); + }); + } - HttpRequest.BodyPublisher bodyPublisher = (getHeaders().getContentLength() >= 0 ? - HttpRequest.BodyPublishers.fromPublisher(flow, getHeaders().getContentLength()) : - HttpRequest.BodyPublishers.fromPublisher(flow)); + private HttpRequest.BodyPublisher toBodyPublisher(Publisher body) { + Publisher byteBufferBody = (body instanceof Mono ? + Mono.from(body).map(DataBuffer::asByteBuffer) : + Flux.from(body).map(DataBuffer::asByteBuffer)); - this.response = Mono.fromCompletionStage(() -> { - HttpRequest request = this.builder.method(this.method.name(), bodyPublisher).build(); - return this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofPublisher()); - }) - .map(response -> new JdkClientHttpResponse(response, this.bufferFactory)); + Flow.Publisher bodyFlow = JdkFlowAdapter.publisherToFlowPublisher(byteBufferBody); - return Mono.empty(); - }); + return (getHeaders().getContentLength() > 0 ? + HttpRequest.BodyPublishers.fromPublisher(bodyFlow, getHeaders().getContentLength()) : + HttpRequest.BodyPublishers.fromPublisher(bodyFlow)); } @Override @@ -160,18 +139,8 @@ class JdkClientHttpRequest extends AbstractClientHttpRequest { @Override public Mono setComplete() { - if (isCommitted()) { - return Mono.empty(); - } - return doCommit(() -> { - this.response = Mono.fromCompletionStage(() -> { - HttpRequest.BodyPublisher bodyPublisher = HttpRequest.BodyPublishers.noBody(); - HttpRequest request = this.builder.method(this.method.name(), bodyPublisher).build(); - return this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofPublisher()); - }) - .map(response -> new JdkClientHttpResponse(response, this.bufferFactory)); - + this.builder.method(this.method.name(), HttpRequest.BodyPublishers.noBody()); return Mono.empty(); }); } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java index c00308e4e4..8beacd5809 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java @@ -21,6 +21,8 @@ import java.net.http.HttpClient; import java.net.http.HttpResponse; import java.nio.ByteBuffer; import java.util.List; +import java.util.Locale; +import java.util.Map; import java.util.concurrent.Flow; import java.util.function.Function; import java.util.regex.Matcher; @@ -36,13 +38,16 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseCookie; import org.springframework.lang.Nullable; +import org.springframework.util.CollectionUtils; +import org.springframework.util.LinkedCaseInsensitiveMap; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; /** - * {@link ClientHttpResponse} implementation for Java's {@link HttpClient}. + * {@link ClientHttpResponse} for the Java {@link HttpClient}. * * @author Julien Eyraud + * @author Rossen Stoyanchev * @since 6.0 */ class JdkClientHttpResponse implements ClientHttpResponse { @@ -54,12 +59,23 @@ class JdkClientHttpResponse implements ClientHttpResponse { private final DataBufferFactory bufferFactory; + private final HttpHeaders headers; + public JdkClientHttpResponse( HttpResponse>> response, DataBufferFactory bufferFactory) { this.response = response; this.bufferFactory = bufferFactory; + this.headers = adaptHeaders(response); + } + + private static HttpHeaders adaptHeaders(HttpResponse>> response) { + Map> rawHeaders = response.headers().map(); + Map> map = new LinkedCaseInsensitiveMap<>(rawHeaders.size(), Locale.ENGLISH); + MultiValueMap multiValueMap = CollectionUtils.toMultiValueMap(map); + multiValueMap.putAll(rawHeaders); + return HttpHeaders.readOnlyHttpHeaders(multiValueMap); } @@ -75,34 +91,31 @@ class JdkClientHttpResponse implements ClientHttpResponse { @Override public HttpHeaders getHeaders() { - return this.response.headers().map().entrySet().stream() - .collect(HttpHeaders::new, - (headers, entry) -> headers.addAll(entry.getKey(), entry.getValue()), - HttpHeaders::addAll); + return this.headers; } @Override public MultiValueMap getCookies() { return this.response.headers().allValues(HttpHeaders.SET_COOKIE).stream() - .flatMap(header -> - HttpCookie.parse(header).stream().map(cookie -> - ResponseCookie.from(cookie.getName(), cookie.getValue()) - .domain(cookie.getDomain()) - .httpOnly(cookie.isHttpOnly()) - .maxAge(cookie.getMaxAge()) - .path(cookie.getPath()) - .secure(cookie.getSecure()) - .sameSite(parseSameSite(header)) - .build())) + .flatMap(header -> { + Matcher matcher = SAME_SITE_PATTERN.matcher(header); + String sameSite = (matcher.matches() ? matcher.group(1) : null); + return HttpCookie.parse(header).stream().map(cookie -> toResponseCookie(cookie, sameSite)); + }) .collect(LinkedMultiValueMap::new, - (valueMap, cookie) -> valueMap.add(cookie.getName(), cookie), + (cookies, cookie) -> cookies.add(cookie.getName(), cookie), LinkedMultiValueMap::addAll); } - @Nullable - private static String parseSameSite(String headerValue) { - Matcher matcher = SAME_SITE_PATTERN.matcher(headerValue); - return (matcher.matches() ? matcher.group(1) : null); + private ResponseCookie toResponseCookie(HttpCookie cookie, @Nullable String sameSite) { + return ResponseCookie.from(cookie.getName(), cookie.getValue()) + .domain(cookie.getDomain()) + .httpOnly(cookie.isHttpOnly()) + .maxAge(cookie.getMaxAge()) + .path(cookie.getPath()) + .secure(cookie.getSecure()) + .sameSite(sameSite) + .build(); } @Override