From 126ac849e5592f65d742d1f23bf891e1a115af01 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Wed, 27 Sep 2017 23:08:30 +0200 Subject: [PATCH] Fix behavior of ClientResponse#bodyTo** with Void Prior to this commit, asking for a `Void` type using any of the `ClientResponse#bodyTo*` methods would immediately return an empty `Publisher` without consuming the response body. Not doing so can lead to HTTP connection pool inconsistencies and/or memory leaks, since: * a connection that still has a response body being written to it cannot be properly recycled in the connection pool * incoming `DataBuffer` might not be released This commit detects when `Void` types are asked as body types and in those cases does the following: 1. Subscribe to the response body `Publisher` to allow the connection to be returned to the connection pool 2. `cancel()` the body `Publisher` if the response body is not empty; in that case, we choose to close the connection vs. consume the whole response body Those changes imply that `ClientHttpResponse` and other related contracts don't need a `close()` method anymore. Issue: SPR-16018 --- .../reactive/MockClientHttpResponse.java | 14 ----- .../client/reactive/ClientHttpResponse.java | 16 +---- .../reactive/ClientHttpResponseDecorator.java | 5 -- .../reactive/ReactorClientHttpResponse.java | 5 -- .../reactive/test/MockClientHttpResponse.java | 13 ---- .../function/client/ClientResponse.java | 15 +---- .../client/DefaultClientResponse.java | 62 +++++++++++++++--- .../reactive/function/client/WebClient.java | 18 ++---- .../client/DefaultClientResponseTests.java | 63 +++++++++++++++++-- .../client/WebClientIntegrationTests.java | 10 +-- .../function/client/WebClientMockTests.java | 53 ---------------- 11 files changed, 122 insertions(+), 152 deletions(-) delete mode 100644 spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientMockTests.java diff --git a/spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java b/spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java index 0627906e2c..760da17b74 100644 --- a/spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java +++ b/spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java @@ -55,8 +55,6 @@ public class MockClientHttpResponse implements ClientHttpResponse { private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); - private boolean closed = false; - public MockClientHttpResponse(HttpStatus status) { Assert.notNull(status, "HttpStatus is required"); @@ -98,21 +96,9 @@ public class MockClientHttpResponse implements ClientHttpResponse { @Override public Flux getBody() { - if (this.closed) { - return Flux.error(new IllegalStateException("Connection has been closed.")); - } return this.body; } - @Override - public void close() { - this.closed = true; - } - - public boolean isClosed() { - return this.closed; - } - /** * Return the response body aggregated and converted to a String using the * charset of the Content-Type response or otherwise as "UTF-8". diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java index 39bf1b3e77..df478d432a 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java @@ -16,8 +16,6 @@ package org.springframework.http.client.reactive; -import java.io.Closeable; - import org.springframework.http.HttpStatus; import org.springframework.http.ReactiveHttpInputMessage; import org.springframework.http.ResponseCookie; @@ -30,7 +28,7 @@ import org.springframework.util.MultiValueMap; * @author Brian Clozel * @since 5.0 */ -public interface ClientHttpResponse extends ReactiveHttpInputMessage, Closeable { +public interface ClientHttpResponse extends ReactiveHttpInputMessage { /** * Return the HTTP status as an {@link HttpStatus} enum value. @@ -42,16 +40,4 @@ public interface ClientHttpResponse extends ReactiveHttpInputMessage, Closeable */ MultiValueMap getCookies(); - /** - * Close this response and the underlying HTTP connection. - *

This non-blocking method has to be called if its body isn't going - * to be consumed. Not doing so might result in HTTP connection pool - * inconsistencies or memory leaks. - *

This shouldn't be called if the response body is read, - * because it would prevent connections to be reused and cancel - * the benefits of using a connection pooling. - */ - @Override - void close(); - } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java index c8cbf11462..199a9495cf 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java @@ -70,11 +70,6 @@ public class ClientHttpResponseDecorator implements ClientHttpResponse { return this.delegate.getBody(); } - @Override - public void close() { - this.delegate.close(); - } - @Override public String toString() { return getClass().getSimpleName() + " [delegate=" + getDelegate() + "]"; diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index 778a9a512f..312432f346 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -89,11 +89,6 @@ public class ReactorClientHttpResponse implements ClientHttpResponse { return CollectionUtils.unmodifiableMultiValueMap(result); } - @Override - public void close() { - this.response.dispose(); - } - @Override public String toString() { return "ReactorClientHttpResponse{" + diff --git a/spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java b/spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java index 7f89d05792..f67d884afd 100644 --- a/spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java +++ b/spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java @@ -55,7 +55,6 @@ public class MockClientHttpResponse implements ClientHttpResponse { private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); - private boolean closed = false; public MockClientHttpResponse(HttpStatus status) { Assert.notNull(status, "HttpStatus is required"); @@ -99,21 +98,9 @@ public class MockClientHttpResponse implements ClientHttpResponse { @Override public Flux getBody() { - if (this.closed) { - return Flux.error(new IllegalStateException("Connection has been closed.")); - } return this.body; } - @Override - public void close() { - this.closed = true; - } - - public boolean isClosed() { - return this.closed; - } - /** * Return the response body aggregated and converted to a String using the * charset of the Content-Type response or otherwise as "UTF-8". diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java index 2247a6c5bc..112167d06a 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java @@ -16,7 +16,6 @@ package org.springframework.web.reactive.function.client; -import java.io.Closeable; import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -44,7 +43,7 @@ import org.springframework.web.reactive.function.BodyExtractor; * @author Arjen Poutsma * @since 5.0 */ -public interface ClientResponse extends Closeable { +public interface ClientResponse { /** * Return the status code of this response. @@ -133,18 +132,6 @@ public interface ClientResponse extends Closeable { */ Mono>> toEntityList(ParameterizedTypeReference typeReference); - /** - * Close this response and the underlying HTTP connection. - *

This non-blocking method has to be called if its body isn't going - * to be consumed. Not doing so might result in HTTP connection pool - * inconsistencies or memory leaks. - *

This shouldn't be called if the response body is read, - * because it would prevent connections to be reused and cancel - * the benefits of using a connection pooling. - */ - @Override - void close(); - /** * Represents the headers of the HTTP response. diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java index c58e3ba455..ca6b2cc5a2 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java @@ -26,6 +26,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import org.springframework.core.ParameterizedTypeReference; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -98,32 +99,73 @@ class DefaultClientResponse implements ClientResponse { @Override public Mono bodyToMono(Class elementClass) { - return body(BodyExtractors.toMono(elementClass)); + if (Void.class.isAssignableFrom(elementClass)) { + return consumeAndCancel(); + } + else { + return body(BodyExtractors.toMono(elementClass)); + } + } + + @SuppressWarnings("unchecked") + private Mono consumeAndCancel() { + return (Mono) this.response.getBody() + .map(buffer -> { + DataBufferUtils.release(buffer); + throw new ReadCancellationException(); + }) + .onErrorResume(ReadCancellationException.class, ex -> Mono.empty()) + .then(); } @Override public Mono bodyToMono(ParameterizedTypeReference typeReference) { - return body(BodyExtractors.toMono(typeReference)); + if (Void.class.isAssignableFrom(typeReference.getType().getClass())) { + return consumeAndCancel(); + } + else { + return body(BodyExtractors.toMono(typeReference)); + } } @Override public Flux bodyToFlux(Class elementClass) { - return body(BodyExtractors.toFlux(elementClass)); + if (Void.class.isAssignableFrom(elementClass)) { + return Flux.from(consumeAndCancel()); + } + else { + return body(BodyExtractors.toFlux(elementClass)); + } } @Override public Flux bodyToFlux(ParameterizedTypeReference typeReference) { - return body(BodyExtractors.toFlux(typeReference)); + if (Void.class.isAssignableFrom(typeReference.getType().getClass())) { + return Flux.from(consumeAndCancel()); + } + else { + return body(BodyExtractors.toFlux(typeReference)); + } } @Override public Mono> toEntity(Class bodyType) { - return toEntityInternal(bodyToMono(bodyType)); + if (Void.class.isAssignableFrom(bodyType)) { + return toEntityInternal(consumeAndCancel()); + } + else { + return toEntityInternal(bodyToMono(bodyType)); + } } @Override public Mono> toEntity(ParameterizedTypeReference typeReference) { - return toEntityInternal(bodyToMono(typeReference)); + if (Void.class.isAssignableFrom(typeReference.getType().getClass())) { + return toEntityInternal(consumeAndCancel()); + } + else { + return toEntityInternal(bodyToMono(typeReference)); + } } private Mono> toEntityInternal(Mono bodyMono) { @@ -154,10 +196,6 @@ class DefaultClientResponse implements ClientResponse { .map(body -> new ResponseEntity<>(body, headers, statusCode)); } - @Override - public void close() { - this.response.close(); - } private class DefaultHeaders implements Headers { @@ -191,4 +229,8 @@ class DefaultClientResponse implements ClientResponse { } } + + @SuppressWarnings("serial") + private class ReadCancellationException extends RuntimeException { + } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java index 247e0804a6..ae057de0f3 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java @@ -461,17 +461,11 @@ public interface WebClient { * .exchange() * .flatMapMany(response -> response.bodyToFlux(Pojo.class)); * - *

If the response body is not consumed with {@code bodyTo*} - * or {@code toEntity*} methods, it is your responsibility - * to release the HTTP resources with {@link ClientResponse#close()}. - *

-		 * Mono<HttpStatus> mono = client.get().uri("/")
-		 *     .exchange()
-		 *     .map(response -> {
-		 *         response.close();
-		 *         return response.statusCode();
-		 *     });
-		 * 
+ *

The response body should always be consumed with {@code bodyTo*} + * or {@code toEntity*} methods; if you do not care about the body, + * you can use {@code bodyToMono(Void.class)}. + *

Not consuming the response body might lead to HTTP connection pool + * inconsistencies or memory leaks. * @return a {@code Mono} with the response * @see #retrieve() */ @@ -491,8 +485,6 @@ public interface WebClient { * .retrieve() * .bodyToMono(Pojo.class); * - *

Since this method reads the response body, - * {@link ClientResponse#close()} should not be called. * @return spec with options for extracting the response body */ ResponseSpec retrieve(); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientResponseTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientResponseTests.java index 56ad811cc7..06b89ca9f7 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientResponseTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientResponseTests.java @@ -28,6 +28,8 @@ import org.junit.Before; import org.junit.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; +import reactor.test.publisher.TestPublisher; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.codec.StringDecoder; @@ -46,8 +48,10 @@ import org.springframework.http.codec.HttpMessageReader; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.springframework.web.reactive.function.BodyExtractors.toMono; /** @@ -214,7 +218,8 @@ public class DefaultClientResponseTests { when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders); Flux resultFlux = - defaultClientResponse.bodyToFlux(new ParameterizedTypeReference() {}); + defaultClientResponse.bodyToFlux(new ParameterizedTypeReference() { + }); Mono> result = resultFlux.collectList(); assertEquals(Collections.singletonList("foo"), result.block()); } @@ -260,7 +265,8 @@ public class DefaultClientResponseTests { when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders); ResponseEntity result = defaultClientResponse.toEntity( - new ParameterizedTypeReference() {}).block(); + new ParameterizedTypeReference() { + }).block(); assertEquals("foo", result.getBody()); assertEquals(HttpStatus.OK, result.getStatusCode()); assertEquals(MediaType.TEXT_PLAIN, result.getHeaders().getContentType()); @@ -307,13 +313,60 @@ public class DefaultClientResponseTests { when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders); ResponseEntity> result = defaultClientResponse.toEntityList( - new ParameterizedTypeReference() {}).block(); + new ParameterizedTypeReference() { + }).block(); assertEquals(Collections.singletonList("foo"), result.getBody()); assertEquals(HttpStatus.OK, result.getStatusCode()); assertEquals(MediaType.TEXT_PLAIN, result.getHeaders().getContentType()); } + @Test + public void toMonoVoid() throws Exception { + TestPublisher body = TestPublisher.create(); + + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.TEXT_PLAIN); + when(mockResponse.getHeaders()).thenReturn(httpHeaders); + when(mockResponse.getStatusCode()).thenReturn(HttpStatus.OK); + when(mockResponse.getBody()).thenReturn(body.flux()); + + List> messageReaders = Collections + .singletonList(new DecoderHttpMessageReader<>(StringDecoder.allMimeTypes(true))); + when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders); + + StepVerifier.create(defaultClientResponse.bodyToMono(Void.class)) + .then(() -> { + body.assertWasSubscribed(); + body.complete(); + }) + .verifyComplete(); + } + + @Test + public void toMonoVoidNonEmptyBody() throws Exception { + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + DefaultDataBuffer dataBuffer = + factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8))); + TestPublisher body = TestPublisher.create(); + + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.setContentType(MediaType.TEXT_PLAIN); + when(mockResponse.getHeaders()).thenReturn(httpHeaders); + when(mockResponse.getStatusCode()).thenReturn(HttpStatus.OK); + when(mockResponse.getBody()).thenReturn(body.flux()); + List> messageReaders = Collections + .singletonList(new DecoderHttpMessageReader<>(StringDecoder.allMimeTypes(true))); + when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders); + + StepVerifier.create(defaultClientResponse.bodyToMono(Void.class)) + .then(() -> { + body.assertWasSubscribed(); + body.emit(dataBuffer); + }) + .verifyComplete(); + body.assertCancelled(); + } } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index f567adad98..a8ebc56f52 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -541,15 +541,15 @@ public class WebClientIntegrationTests { @Test public void shouldReceiveEmptyResponse() throws Exception { - prepareResponse(response -> response.setHeader("Content-Length", "0")); + prepareResponse(response -> response.setHeader("Content-Length", "0").setBody("")); - Mono result = this.webClient.get() + Mono> result = this.webClient.get() .uri("/noContent") - .exchange(); + .exchange() + .flatMap(response -> response.toEntity(Void.class)); StepVerifier.create(result).assertNext(r -> { - assertTrue(r.statusCode().is2xxSuccessful()); - StepVerifier.create(r.bodyToMono(Void.class)).verifyComplete(); + assertTrue(r.getStatusCode().is2xxSuccessful()); }).verifyComplete(); } diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientMockTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientMockTests.java deleted file mode 100644 index 0fe7994fff..0000000000 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientMockTests.java +++ /dev/null @@ -1,53 +0,0 @@ -package org.springframework.web.reactive.function.client; - -import org.junit.Before; -import org.junit.Test; -import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; - -import org.springframework.http.HttpHeaders; -import org.springframework.http.HttpStatus; -import org.springframework.http.client.reactive.ClientHttpConnector; -import org.springframework.mock.http.client.reactive.test.MockClientHttpResponse; - -import static org.junit.Assert.assertFalse; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.mock; - -/** - * Mock tests using a {@link ExchangeFunction} through {@link WebClient}. - * - * @author Brian Clozel - */ -public class WebClientMockTests { - - private MockClientHttpResponse response; - - private ClientHttpConnector mockConnector; - - private WebClient webClient; - - @Before - public void setUp() throws Exception { - this.mockConnector = mock(ClientHttpConnector.class); - this.webClient = WebClient.builder().clientConnector(this.mockConnector).build(); - this.response = new MockClientHttpResponse(HttpStatus.OK); - this.response.setBody("example"); - - given(this.mockConnector.connect(any(), any(), any())).willReturn(Mono.just(this.response)); - } - - @Test - public void shouldDisposeResponseManually() { - Mono headers = this.webClient - .get().uri("/test") - .exchange() - .map(response -> response.headers().asHttpHeaders()); - StepVerifier.create(headers) - .expectNextCount(1) - .verifyComplete(); - assertFalse(this.response.isClosed()); - } - -}