diff --git a/spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java b/spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java index b67b3a1392..0627906e2c 100644 --- a/spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java +++ b/spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java @@ -55,6 +55,8 @@ public class MockClientHttpResponse implements ClientHttpResponse { private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); + private boolean closed = false; + public MockClientHttpResponse(HttpStatus status) { Assert.notNull(status, "HttpStatus is required"); @@ -94,10 +96,23 @@ public class MockClientHttpResponse implements ClientHttpResponse { return this.bufferFactory.wrap(byteBuffer); } + @Override public Flux getBody() { + if (this.closed) { + return Flux.error(new IllegalStateException("Connection has been closed.")); + } return this.body; } + @Override + public void close() { + this.closed = true; + } + + public boolean isClosed() { + return this.closed; + } + /** * Return the response body aggregated and converted to a String using the * charset of the Content-Type response or otherwise as "UTF-8". diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java index b0563927bd..3a3955a629 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java @@ -16,6 +16,8 @@ package org.springframework.http.client.reactive; +import java.io.Closeable; + import org.springframework.http.HttpStatus; import org.springframework.http.ReactiveHttpInputMessage; import org.springframework.http.ResponseCookie; @@ -25,9 +27,10 @@ import org.springframework.util.MultiValueMap; * Represents a client-side reactive HTTP response. * * @author Arjen Poutsma + * @author Brian Clozel * @since 5.0 */ -public interface ClientHttpResponse extends ReactiveHttpInputMessage { +public interface ClientHttpResponse extends ReactiveHttpInputMessage, Closeable { /** * Return the HTTP status as an {@link HttpStatus} enum value. @@ -39,4 +42,15 @@ public interface ClientHttpResponse extends ReactiveHttpInputMessage { */ MultiValueMap getCookies(); + /** + * Close this response, freeing any resources created. + *

This non-blocking method has to be called once the response has been + * processed and the resources are no longer needed; not doing so might + * create resource leaks or connection issues. + *

Depending on the client configuration and HTTP version, + * this can lead to closing the connection or returning it to a connection pool. + */ + @Override + void close(); + } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java index b37a394e9c..c8cbf11462 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java @@ -70,6 +70,10 @@ public class ClientHttpResponseDecorator implements ClientHttpResponse { return this.delegate.getBody(); } + @Override + public void close() { + this.delegate.close(); + } @Override public String toString() { diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index 7674b08907..778a9a512f 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -89,6 +89,10 @@ public class ReactorClientHttpResponse implements ClientHttpResponse { return CollectionUtils.unmodifiableMultiValueMap(result); } + @Override + public void close() { + this.response.dispose(); + } @Override public String toString() { diff --git a/spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java b/spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java index c872d13c2e..7f89d05792 100644 --- a/spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java +++ b/spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java @@ -55,6 +55,7 @@ public class MockClientHttpResponse implements ClientHttpResponse { private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); + private boolean closed = false; public MockClientHttpResponse(HttpStatus status) { Assert.notNull(status, "HttpStatus is required"); @@ -62,6 +63,7 @@ public class MockClientHttpResponse implements ClientHttpResponse { } + @Override public HttpStatus getStatusCode() { return this.status; } @@ -71,6 +73,7 @@ public class MockClientHttpResponse implements ClientHttpResponse { return this.headers; } + @Override public MultiValueMap getCookies() { return this.cookies; } @@ -94,10 +97,23 @@ public class MockClientHttpResponse implements ClientHttpResponse { return this.bufferFactory.wrap(byteBuffer); } + @Override public Flux getBody() { + if (this.closed) { + return Flux.error(new IllegalStateException("Connection has been closed.")); + } return this.body; } + @Override + public void close() { + this.closed = true; + } + + public boolean isClosed() { + return this.closed; + } + /** * Return the response body aggregated and converted to a String using the * charset of the Content-Type response or otherwise as "UTF-8". diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java index 268fba5121..58a1da89c7 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java @@ -16,6 +16,7 @@ package org.springframework.web.reactive.function.client; +import java.io.Closeable; import java.util.List; import java.util.Optional; import java.util.OptionalLong; @@ -43,7 +44,7 @@ import org.springframework.web.reactive.function.BodyExtractor; * @author Arjen Poutsma * @since 5.0 */ -public interface ClientResponse { +public interface ClientResponse extends Closeable { /** * Return the status code of this response. @@ -132,6 +133,18 @@ public interface ClientResponse { */ Mono>> toEntityList(ParameterizedTypeReference typeReference); + /** + * Close this response, freeing any resources created. + *

This non-blocking method has to be called once the response has been processed + * and the resources are no longer needed. + *

{@code ClientResponse.bodyTo*}, {@code ClientResponse.toEntity*} + * and all methods under {@code WebClient.retrieve()} will close the response + * automatically. + *

It is required to call close() manually otherwise; not doing so might + * create resource leaks or connection issues. + */ + @Override + void close(); /** * Represents the headers of the HTTP response. diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java index 664e058ce0..31d8820d25 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java @@ -42,6 +42,7 @@ import org.springframework.web.reactive.function.BodyExtractors; * Default implementation of {@link ClientResponse}. * * @author Arjen Poutsma + * @author Brian Clozel * @since 5.0 */ class DefaultClientResponse implements ClientResponse { @@ -97,22 +98,24 @@ class DefaultClientResponse implements ClientResponse { @Override public Mono bodyToMono(Class elementClass) { - return body(BodyExtractors.toMono(elementClass)); + Mono body = body(BodyExtractors.toMono(elementClass)); + return body.doOnTerminate(this.response::close); } @Override public Mono bodyToMono(ParameterizedTypeReference typeReference) { - return body(BodyExtractors.toMono(typeReference)); + return body(BodyExtractors.toMono(typeReference)).doOnTerminate(this.response::close); } @Override public Flux bodyToFlux(Class elementClass) { - return body(BodyExtractors.toFlux(elementClass)); + Flux body = body(BodyExtractors.toFlux(elementClass)); + return body.doOnTerminate(this.response::close); } @Override public Flux bodyToFlux(ParameterizedTypeReference typeReference) { - return body(BodyExtractors.toFlux(typeReference)); + return body(BodyExtractors.toFlux(typeReference)).doOnTerminate(this.response::close); } @Override @@ -131,7 +134,8 @@ class DefaultClientResponse implements ClientResponse { return bodyMono .map(body -> new ResponseEntity<>(body, headers, statusCode)) .switchIfEmpty(Mono.defer( - () -> Mono.just(new ResponseEntity<>(headers, statusCode)))); + () -> Mono.just(new ResponseEntity<>(headers, statusCode)))) + .doOnTerminate(this.response::close); } @Override @@ -150,9 +154,14 @@ class DefaultClientResponse implements ClientResponse { HttpStatus statusCode = statusCode(); return bodyFlux .collectList() - .map(body -> new ResponseEntity<>(body, headers, statusCode)); + .map(body -> new ResponseEntity<>(body, headers, statusCode)) + .doOnTerminate(this.response::close); } + @Override + public void close() { + this.response.close(); + } private class DefaultHeaders implements Headers { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index 592e96fed6..f2562afcd8 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -413,7 +413,7 @@ class DefaultWebClient implements WebClient { @SuppressWarnings("unchecked") public Mono bodyToMono(Class bodyType) { return this.responseMono.flatMap( - response -> bodyToPublisher(response, BodyExtractors.toMono(bodyType), + response -> bodyToMono(response, BodyExtractors.toMono(bodyType), this::monoThrowableToMono)); } @@ -421,7 +421,7 @@ class DefaultWebClient implements WebClient { @SuppressWarnings("unchecked") public Mono bodyToMono(ParameterizedTypeReference typeReference) { return this.responseMono.flatMap( - response -> bodyToPublisher(response, BodyExtractors.toMono(typeReference), + response -> bodyToMono(response, BodyExtractors.toMono(typeReference), mono -> (Mono)mono)); } @@ -429,17 +429,30 @@ class DefaultWebClient implements WebClient { return mono.flatMap(Mono::error); } + private Mono bodyToMono(ClientResponse response, + BodyExtractor, ? super ClientHttpResponse> extractor, + Function, Mono> errorFunction) { + + return this.statusHandlers.stream() + .filter(statusHandler -> statusHandler.test(response.statusCode())) + .findFirst() + .map(statusHandler -> statusHandler.apply(response)) + .map(errorFunction::apply) + .orElse(response.body(extractor)) + .doAfterTerminate(response::close); + } + @Override public Flux bodyToFlux(Class elementType) { return this.responseMono.flatMapMany( - response -> bodyToPublisher(response, BodyExtractors.toFlux(elementType), + response -> bodyToFlux(response, BodyExtractors.toFlux(elementType), this::monoThrowableToFlux)); } @Override public Flux bodyToFlux(ParameterizedTypeReference typeReference) { return this.responseMono.flatMapMany( - response -> bodyToPublisher(response, BodyExtractors.toFlux(typeReference), + response -> bodyToFlux(response, BodyExtractors.toFlux(typeReference), this::monoThrowableToFlux)); } @@ -447,16 +460,17 @@ class DefaultWebClient implements WebClient { return mono.flatMapMany(Flux::error); } - private > T bodyToPublisher(ClientResponse response, - BodyExtractor extractor, - Function, T> errorFunction) { + private Flux bodyToFlux(ClientResponse response, + BodyExtractor, ? super ClientHttpResponse> extractor, + Function, Flux> errorFunction) { return this.statusHandlers.stream() .filter(statusHandler -> statusHandler.test(response.statusCode())) .findFirst() .map(statusHandler -> statusHandler.apply(response)) .map(errorFunction::apply) - .orElse(response.body(extractor)); + .orElse(response.body(extractor)) + .doAfterTerminate(response::close); } private static Mono createResponseException(ClientResponse response) { diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java index 3122d94d70..eec6489270 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java @@ -461,6 +461,17 @@ public interface WebClient { * .exchange() * .flatMapMany(response -> response.bodyToFlux(Pojo.class)); * + *

If the response body is not consumed with {@code bodyTo*} + * or {@code toEntity*} methods, it is your responsibility + * to release the HTTP resources with {@link ClientResponse#close()}. + *

+		 * Mono<HttpStatus> mono = client.get().uri("/")
+		 *     .exchange()
+		 *     .map(response -> {
+		 *         response.close();
+		 *         return response.statusCode();
+		 *     });
+		 * 
* @return a {@code Mono} with the response * @see #retrieve() */ diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index adf1671c79..ddd36837e1 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -70,6 +70,12 @@ public class WebClientIntegrationTests { public void headers() throws Exception { this.server.enqueue(new MockResponse().setHeader("Content-Type", "text/plain").setBody("Hello Spring!")); + this.webClient.get().uri("/test") + .exchange() + .map(response -> { + response.close(); + return response.statusCode(); + }); Mono result = this.webClient.get() .uri("/greeting?name=Spring") .exchange() diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientMockTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientMockTests.java new file mode 100644 index 0000000000..fabe7abce7 --- /dev/null +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientMockTests.java @@ -0,0 +1,116 @@ +package org.springframework.web.reactive.function.client; + +import org.junit.Before; +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.http.client.reactive.ClientHttpConnector; +import org.springframework.mock.http.client.reactive.test.MockClientHttpResponse; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.mock; + +/** + * Mock tests using a {@link ExchangeFunction} through {@link WebClient}. + * + * @author Brian Clozel + */ +public class WebClientMockTests { + + private MockClientHttpResponse response; + + private ClientHttpConnector mockConnector; + + private WebClient webClient; + + @Before + public void setUp() throws Exception { + this.mockConnector = mock(ClientHttpConnector.class); + this.webClient = WebClient.builder().clientConnector(this.mockConnector).build(); + this.response = new MockClientHttpResponse(HttpStatus.OK); + this.response.setBody("example"); + + given(this.mockConnector.connect(any(), any(), any())).willReturn(Mono.just(this.response)); + } + + @Test + public void shouldDisposeResponseManually() { + Mono headers= this.webClient + .get().uri("/test") + .exchange() + .map(response -> response.headers().asHttpHeaders()); + StepVerifier.create(headers) + .expectNextCount(1) + .verifyComplete(); + assertFalse(this.response.isClosed()); + } + + @Test + public void shouldDisposeResponseExchangeMono() { + Mono body = this.webClient + .get().uri("/test") + .exchange() + .flatMap(response -> response.bodyToMono(String.class)); + StepVerifier.create(body) + .expectNext("example") + .verifyComplete(); + assertTrue(this.response.isClosed()); + } + + @Test + public void shouldDisposeResponseExchangeFlux() { + Flux body = this.webClient + .get().uri("/test") + .exchange() + .flatMapMany(response -> response.bodyToFlux(String.class)); + StepVerifier.create(body) + .expectNext("example") + .verifyComplete(); + assertTrue(this.response.isClosed()); + } + + @Test + public void shouldDisposeResponseExchangeEntity() { + ResponseEntity entity = this.webClient + .get().uri("/test") + .exchange() + .flatMap(response -> response.toEntity(String.class)) + .block(); + assertEquals("example", entity.getBody()); + assertTrue(this.response.isClosed()); + } + + @Test + public void shouldDisposeResponseRetrieveMono() { + Mono body = this.webClient + .get().uri("/test") + .retrieve() + .bodyToMono(String.class); + StepVerifier.create(body) + .expectNext("example") + .verifyComplete(); + assertTrue(this.response.isClosed()); + } + + @Test + public void shouldDisposeResponseRetrieveFlux() { + Flux body = this.webClient + .get().uri("/test") + .retrieve() + .bodyToFlux(String.class); + StepVerifier.create(body) + .expectNext("example") + .verifyComplete(); + assertTrue(this.response.isClosed()); + } + +}