Browse Source

Fix behavior of ClientResponse#bodyTo** with Void

Prior to this commit, asking for a `Void` type using any of the
`ClientResponse#bodyTo*` methods would immediately return an empty
`Publisher` without consuming the response body.

Not doing so can lead to HTTP connection pool inconsistencies and/or
memory leaks, since:

* a connection that still has a response body being written to it cannot
be properly recycled in the connection pool
* incoming `DataBuffer` might not be released

This commit detects when `Void` types are asked as body types and in
those cases does the following:

1. Subscribe to the response body `Publisher` to allow the connection to
be returned to the connection pool
2. `cancel()` the body `Publisher` if the response body is not empty; in
that case, we choose to close the connection vs. consume the whole
response body

Those changes imply that `ClientHttpResponse` and other related
contracts don't need a `close()` method anymore.

Issue: SPR-16018
pull/1595/head
Brian Clozel 7 years ago
parent
commit
126ac849e5
  1. 14
      spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java
  2. 16
      spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java
  3. 5
      spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java
  4. 5
      spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java
  5. 13
      spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java
  6. 15
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java
  7. 62
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java
  8. 18
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java
  9. 63
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientResponseTests.java
  10. 10
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java
  11. 53
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientMockTests.java

14
spring-test/src/main/java/org/springframework/mock/http/client/reactive/MockClientHttpResponse.java

@ -55,8 +55,6 @@ public class MockClientHttpResponse implements ClientHttpResponse { @@ -55,8 +55,6 @@ public class MockClientHttpResponse implements ClientHttpResponse {
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
private boolean closed = false;
public MockClientHttpResponse(HttpStatus status) {
Assert.notNull(status, "HttpStatus is required");
@ -98,21 +96,9 @@ public class MockClientHttpResponse implements ClientHttpResponse { @@ -98,21 +96,9 @@ public class MockClientHttpResponse implements ClientHttpResponse {
@Override
public Flux<DataBuffer> getBody() {
if (this.closed) {
return Flux.error(new IllegalStateException("Connection has been closed."));
}
return this.body;
}
@Override
public void close() {
this.closed = true;
}
public boolean isClosed() {
return this.closed;
}
/**
* Return the response body aggregated and converted to a String using the
* charset of the Content-Type response or otherwise as "UTF-8".

16
spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponse.java

@ -16,8 +16,6 @@ @@ -16,8 +16,6 @@
package org.springframework.http.client.reactive;
import java.io.Closeable;
import org.springframework.http.HttpStatus;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.http.ResponseCookie;
@ -30,7 +28,7 @@ import org.springframework.util.MultiValueMap; @@ -30,7 +28,7 @@ import org.springframework.util.MultiValueMap;
* @author Brian Clozel
* @since 5.0
*/
public interface ClientHttpResponse extends ReactiveHttpInputMessage, Closeable {
public interface ClientHttpResponse extends ReactiveHttpInputMessage {
/**
* Return the HTTP status as an {@link HttpStatus} enum value.
@ -42,16 +40,4 @@ public interface ClientHttpResponse extends ReactiveHttpInputMessage, Closeable @@ -42,16 +40,4 @@ public interface ClientHttpResponse extends ReactiveHttpInputMessage, Closeable
*/
MultiValueMap<String, ResponseCookie> getCookies();
/**
* Close this response and the underlying HTTP connection.
* <p>This non-blocking method has to be called if its body isn't going
* to be consumed. Not doing so might result in HTTP connection pool
* inconsistencies or memory leaks.
* <p>This shouldn't be called if the response body is read,
* because it would prevent connections to be reused and cancel
* the benefits of using a connection pooling.
*/
@Override
void close();
}

5
spring-web/src/main/java/org/springframework/http/client/reactive/ClientHttpResponseDecorator.java

@ -70,11 +70,6 @@ public class ClientHttpResponseDecorator implements ClientHttpResponse { @@ -70,11 +70,6 @@ public class ClientHttpResponseDecorator implements ClientHttpResponse {
return this.delegate.getBody();
}
@Override
public void close() {
this.delegate.close();
}
@Override
public String toString() {
return getClass().getSimpleName() + " [delegate=" + getDelegate() + "]";

5
spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java

@ -89,11 +89,6 @@ public class ReactorClientHttpResponse implements ClientHttpResponse { @@ -89,11 +89,6 @@ public class ReactorClientHttpResponse implements ClientHttpResponse {
return CollectionUtils.unmodifiableMultiValueMap(result);
}
@Override
public void close() {
this.response.dispose();
}
@Override
public String toString() {
return "ReactorClientHttpResponse{" +

13
spring-web/src/test/java/org/springframework/mock/http/client/reactive/test/MockClientHttpResponse.java

@ -55,7 +55,6 @@ public class MockClientHttpResponse implements ClientHttpResponse { @@ -55,7 +55,6 @@ public class MockClientHttpResponse implements ClientHttpResponse {
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory();
private boolean closed = false;
public MockClientHttpResponse(HttpStatus status) {
Assert.notNull(status, "HttpStatus is required");
@ -99,21 +98,9 @@ public class MockClientHttpResponse implements ClientHttpResponse { @@ -99,21 +98,9 @@ public class MockClientHttpResponse implements ClientHttpResponse {
@Override
public Flux<DataBuffer> getBody() {
if (this.closed) {
return Flux.error(new IllegalStateException("Connection has been closed."));
}
return this.body;
}
@Override
public void close() {
this.closed = true;
}
public boolean isClosed() {
return this.closed;
}
/**
* Return the response body aggregated and converted to a String using the
* charset of the Content-Type response or otherwise as "UTF-8".

15
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ClientResponse.java

@ -16,7 +16,6 @@ @@ -16,7 +16,6 @@
package org.springframework.web.reactive.function.client;
import java.io.Closeable;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
@ -44,7 +43,7 @@ import org.springframework.web.reactive.function.BodyExtractor; @@ -44,7 +43,7 @@ import org.springframework.web.reactive.function.BodyExtractor;
* @author Arjen Poutsma
* @since 5.0
*/
public interface ClientResponse extends Closeable {
public interface ClientResponse {
/**
* Return the status code of this response.
@ -133,18 +132,6 @@ public interface ClientResponse extends Closeable { @@ -133,18 +132,6 @@ public interface ClientResponse extends Closeable {
*/
<T> Mono<ResponseEntity<List<T>>> toEntityList(ParameterizedTypeReference<T> typeReference);
/**
* Close this response and the underlying HTTP connection.
* <p>This non-blocking method has to be called if its body isn't going
* to be consumed. Not doing so might result in HTTP connection pool
* inconsistencies or memory leaks.
* <p>This shouldn't be called if the response body is read,
* because it would prevent connections to be reused and cancel
* the benefits of using a connection pooling.
*/
@Override
void close();
/**
* Represents the headers of the HTTP response.

62
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java

@ -26,6 +26,7 @@ import reactor.core.publisher.Flux; @@ -26,6 +26,7 @@ import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
@ -98,32 +99,73 @@ class DefaultClientResponse implements ClientResponse { @@ -98,32 +99,73 @@ class DefaultClientResponse implements ClientResponse {
@Override
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
return body(BodyExtractors.toMono(elementClass));
if (Void.class.isAssignableFrom(elementClass)) {
return consumeAndCancel();
}
else {
return body(BodyExtractors.toMono(elementClass));
}
}
@SuppressWarnings("unchecked")
private <T> Mono<T> consumeAndCancel() {
return (Mono<T>) this.response.getBody()
.map(buffer -> {
DataBufferUtils.release(buffer);
throw new ReadCancellationException();
})
.onErrorResume(ReadCancellationException.class, ex -> Mono.empty())
.then();
}
@Override
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
return body(BodyExtractors.toMono(typeReference));
if (Void.class.isAssignableFrom(typeReference.getType().getClass())) {
return consumeAndCancel();
}
else {
return body(BodyExtractors.toMono(typeReference));
}
}
@Override
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
return body(BodyExtractors.toFlux(elementClass));
if (Void.class.isAssignableFrom(elementClass)) {
return Flux.from(consumeAndCancel());
}
else {
return body(BodyExtractors.toFlux(elementClass));
}
}
@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
return body(BodyExtractors.toFlux(typeReference));
if (Void.class.isAssignableFrom(typeReference.getType().getClass())) {
return Flux.from(consumeAndCancel());
}
else {
return body(BodyExtractors.toFlux(typeReference));
}
}
@Override
public <T> Mono<ResponseEntity<T>> toEntity(Class<T> bodyType) {
return toEntityInternal(bodyToMono(bodyType));
if (Void.class.isAssignableFrom(bodyType)) {
return toEntityInternal(consumeAndCancel());
}
else {
return toEntityInternal(bodyToMono(bodyType));
}
}
@Override
public <T> Mono<ResponseEntity<T>> toEntity(ParameterizedTypeReference<T> typeReference) {
return toEntityInternal(bodyToMono(typeReference));
if (Void.class.isAssignableFrom(typeReference.getType().getClass())) {
return toEntityInternal(consumeAndCancel());
}
else {
return toEntityInternal(bodyToMono(typeReference));
}
}
private <T> Mono<ResponseEntity<T>> toEntityInternal(Mono<T> bodyMono) {
@ -154,10 +196,6 @@ class DefaultClientResponse implements ClientResponse { @@ -154,10 +196,6 @@ class DefaultClientResponse implements ClientResponse {
.map(body -> new ResponseEntity<>(body, headers, statusCode));
}
@Override
public void close() {
this.response.close();
}
private class DefaultHeaders implements Headers {
@ -191,4 +229,8 @@ class DefaultClientResponse implements ClientResponse { @@ -191,4 +229,8 @@ class DefaultClientResponse implements ClientResponse {
}
}
@SuppressWarnings("serial")
private class ReadCancellationException extends RuntimeException {
}
}

18
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java

@ -461,17 +461,11 @@ public interface WebClient { @@ -461,17 +461,11 @@ public interface WebClient {
* .exchange()
* .flatMapMany(response -> response.bodyToFlux(Pojo.class));
* </pre>
* <p>If the response body is not consumed with {@code bodyTo*}
* or {@code toEntity*} methods, it is your responsibility
* to release the HTTP resources with {@link ClientResponse#close()}.
* <pre>
* Mono&lt;HttpStatus&gt; mono = client.get().uri("/")
* .exchange()
* .map(response -> {
* response.close();
* return response.statusCode();
* });
* </pre>
* <p>The response body should always be consumed with {@code bodyTo*}
* or {@code toEntity*} methods; if you do not care about the body,
* you can use {@code bodyToMono(Void.class)}.
* <p>Not consuming the response body might lead to HTTP connection pool
* inconsistencies or memory leaks.
* @return a {@code Mono} with the response
* @see #retrieve()
*/
@ -491,8 +485,6 @@ public interface WebClient { @@ -491,8 +485,6 @@ public interface WebClient {
* .retrieve()
* .bodyToMono(Pojo.class);
* </pre>
* <p>Since this method reads the response body,
* {@link ClientResponse#close()} should not be called.
* @return spec with options for extracting the response body
*/
ResponseSpec retrieve();

63
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/DefaultClientResponseTests.java

@ -28,6 +28,8 @@ import org.junit.Before; @@ -28,6 +28,8 @@ import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.codec.StringDecoder;
@ -46,8 +48,10 @@ import org.springframework.http.codec.HttpMessageReader; @@ -46,8 +48,10 @@ import org.springframework.http.codec.HttpMessageReader;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.springframework.web.reactive.function.BodyExtractors.toMono;
/**
@ -214,7 +218,8 @@ public class DefaultClientResponseTests { @@ -214,7 +218,8 @@ public class DefaultClientResponseTests {
when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders);
Flux<String> resultFlux =
defaultClientResponse.bodyToFlux(new ParameterizedTypeReference<String>() {});
defaultClientResponse.bodyToFlux(new ParameterizedTypeReference<String>() {
});
Mono<List<String>> result = resultFlux.collectList();
assertEquals(Collections.singletonList("foo"), result.block());
}
@ -260,7 +265,8 @@ public class DefaultClientResponseTests { @@ -260,7 +265,8 @@ public class DefaultClientResponseTests {
when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders);
ResponseEntity<String> result = defaultClientResponse.toEntity(
new ParameterizedTypeReference<String>() {}).block();
new ParameterizedTypeReference<String>() {
}).block();
assertEquals("foo", result.getBody());
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(MediaType.TEXT_PLAIN, result.getHeaders().getContentType());
@ -307,13 +313,60 @@ public class DefaultClientResponseTests { @@ -307,13 +313,60 @@ public class DefaultClientResponseTests {
when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders);
ResponseEntity<List<String>> result = defaultClientResponse.toEntityList(
new ParameterizedTypeReference<String>() {}).block();
new ParameterizedTypeReference<String>() {
}).block();
assertEquals(Collections.singletonList("foo"), result.getBody());
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(MediaType.TEXT_PLAIN, result.getHeaders().getContentType());
}
@Test
public void toMonoVoid() throws Exception {
TestPublisher<DataBuffer> body = TestPublisher.create();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.TEXT_PLAIN);
when(mockResponse.getHeaders()).thenReturn(httpHeaders);
when(mockResponse.getStatusCode()).thenReturn(HttpStatus.OK);
when(mockResponse.getBody()).thenReturn(body.flux());
List<HttpMessageReader<?>> messageReaders = Collections
.singletonList(new DecoderHttpMessageReader<>(StringDecoder.allMimeTypes(true)));
when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders);
StepVerifier.create(defaultClientResponse.bodyToMono(Void.class))
.then(() -> {
body.assertWasSubscribed();
body.complete();
})
.verifyComplete();
}
@Test
public void toMonoVoidNonEmptyBody() throws Exception {
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
DefaultDataBuffer dataBuffer =
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
TestPublisher<DataBuffer> body = TestPublisher.create();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.setContentType(MediaType.TEXT_PLAIN);
when(mockResponse.getHeaders()).thenReturn(httpHeaders);
when(mockResponse.getStatusCode()).thenReturn(HttpStatus.OK);
when(mockResponse.getBody()).thenReturn(body.flux());
List<HttpMessageReader<?>> messageReaders = Collections
.singletonList(new DecoderHttpMessageReader<>(StringDecoder.allMimeTypes(true)));
when(mockExchangeStrategies.messageReaders()).thenReturn(messageReaders);
StepVerifier.create(defaultClientResponse.bodyToMono(Void.class))
.then(() -> {
body.assertWasSubscribed();
body.emit(dataBuffer);
})
.verifyComplete();
body.assertCancelled();
}
}

10
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java

@ -541,15 +541,15 @@ public class WebClientIntegrationTests { @@ -541,15 +541,15 @@ public class WebClientIntegrationTests {
@Test
public void shouldReceiveEmptyResponse() throws Exception {
prepareResponse(response -> response.setHeader("Content-Length", "0"));
prepareResponse(response -> response.setHeader("Content-Length", "0").setBody(""));
Mono<ClientResponse> result = this.webClient.get()
Mono<ResponseEntity<Void>> result = this.webClient.get()
.uri("/noContent")
.exchange();
.exchange()
.flatMap(response -> response.toEntity(Void.class));
StepVerifier.create(result).assertNext(r -> {
assertTrue(r.statusCode().is2xxSuccessful());
StepVerifier.create(r.bodyToMono(Void.class)).verifyComplete();
assertTrue(r.getStatusCode().is2xxSuccessful());
}).verifyComplete();
}

53
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientMockTests.java

@ -1,53 +0,0 @@ @@ -1,53 +0,0 @@
package org.springframework.web.reactive.function.client;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.mock.http.client.reactive.test.MockClientHttpResponse;
import static org.junit.Assert.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
/**
* Mock tests using a {@link ExchangeFunction} through {@link WebClient}.
*
* @author Brian Clozel
*/
public class WebClientMockTests {
private MockClientHttpResponse response;
private ClientHttpConnector mockConnector;
private WebClient webClient;
@Before
public void setUp() throws Exception {
this.mockConnector = mock(ClientHttpConnector.class);
this.webClient = WebClient.builder().clientConnector(this.mockConnector).build();
this.response = new MockClientHttpResponse(HttpStatus.OK);
this.response.setBody("example");
given(this.mockConnector.connect(any(), any(), any())).willReturn(Mono.just(this.response));
}
@Test
public void shouldDisposeResponseManually() {
Mono<HttpHeaders> headers = this.webClient
.get().uri("/test")
.exchange()
.map(response -> response.headers().asHttpHeaders());
StepVerifier.create(headers)
.expectNextCount(1)
.verifyComplete();
assertFalse(this.response.isClosed());
}
}
Loading…
Cancel
Save