From 2ba5ded3068c7de411267857b999364c263bbf17 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 23 Oct 2018 21:34:42 -0400 Subject: [PATCH] Polish Jetty reactive HttpClient connector --- .../reactive/JettyClientHttpConnector.java | 40 +++++++++++-------- .../reactive/JettyClientHttpRequest.java | 20 +++++----- 2 files changed, 35 insertions(+), 25 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java index 14f008992b..9b0734c98a 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java @@ -21,7 +21,7 @@ import java.util.function.Consumer; import java.util.function.Function; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.reactive.client.ContentChunk; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -33,12 +33,7 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** - * Jetty ReactiveStreams HttpClient implementation of {@link ClientHttpConnector}. - * - * Implemented with buffer copy instead of optimized buffer wrapping because the latter - * hangs since {@link Callback#succeeded()} doesn't allow releasing the buffer and - * requesting more data at different times (required for {@code Mono} for example). - * See https://github.com/eclipse/jetty.project/issues/2429 for more details. + * {@link ClientHttpConnector} for the Jetty Reactive Streams HttpClient. * * @author Sebastien Deleuze * @since 5.1 @@ -63,7 +58,9 @@ public class JettyClientHttpConnector implements ClientHttpConnector { * @param resourceFactory the {@link JettyResourceFactory} to use * @param customizer the lambda used to customize the {@link HttpClient} */ - public JettyClientHttpConnector(JettyResourceFactory resourceFactory, @Nullable Consumer customizer) { + public JettyClientHttpConnector( + JettyResourceFactory resourceFactory, @Nullable Consumer customizer) { + HttpClient httpClient = new HttpClient(); httpClient.setExecutor(resourceFactory.getExecutor()); httpClient.setByteBufferPool(resourceFactory.getByteBufferPool()); @@ -107,16 +104,27 @@ public class JettyClientHttpConnector implements ClientHttpConnector { JettyClientHttpRequest clientHttpRequest = new JettyClientHttpRequest( this.httpClient.newRequest(uri).method(method.toString()), this.bufferFactory); + return requestCallback.apply(clientHttpRequest).then(Mono.from( - clientHttpRequest.getReactiveRequest().response((reactiveResponse, contentChunks) -> { - Flux content = Flux.from(contentChunks).map(chunk -> { - DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity()); - buffer.write(chunk.buffer); - chunk.callback.succeeded(); - return buffer; - }); - return Mono.just(new JettyClientHttpResponse(reactiveResponse, content)); + clientHttpRequest.getReactiveRequest().response((response, chunks) -> { + Flux content = Flux.from(chunks).map(this::toDataBuffer); + return Mono.just(new JettyClientHttpResponse(response, content)); }))); } + private DataBuffer toDataBuffer(ContentChunk chunk) { + + // We must copy until this is resolved: + // https://github.com/eclipse/jetty.project/issues/2429 + + // Use copy instead of buffer wrapping because Callback#succeeded() is + // used not only to release the buffer but also to request more data + // which is a problem for codecs that buffer data. + + DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity()); + buffer.write(chunk.buffer); + chunk.callback.succeeded(); + return buffer; + } + } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java index b7a847221e..4fa842ef01 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.function.Function; import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.reactive.client.ContentChunk; import org.eclipse.jetty.reactive.client.ReactiveRequest; import org.eclipse.jetty.util.Callback; @@ -86,24 +85,26 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { } @Override - public Mono writeWith(Publisher publisher) { - Flux chunks = Flux.from(publisher).map(this::toContentChunk); - MediaType contentType = getHeaders().getContentType(); - ReactiveRequest.Content requestContent = ReactiveRequest.Content.fromPublisher(chunks, - (contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE)); - this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(requestContent).build(); + public Mono writeWith(Publisher body) { + Flux chunks = Flux.from(body).map(this::toContentChunk); + ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType()); + this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build(); return doCommit(this::completes); } @Override public Mono writeAndFlushWith(Publisher> body) { - String contentType = this.jettyRequest.getHeaders().getField(HttpHeader.CONTENT_TYPE).getValue(); Flux chunks = Flux.from(body).flatMap(Function.identity()).map(this::toContentChunk); - ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, contentType); + ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType()); this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build(); return doCommit(this::completes); } + private String getContentType() { + MediaType contentType = getHeaders().getContentType(); + return contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE; + } + private Mono completes() { return Mono.empty(); } @@ -146,4 +147,5 @@ class JettyClientHttpRequest extends AbstractClientHttpRequest { } return this.reactiveRequest; } + }