Browse Source

Interpret empty mono from status handler as normal response

Prior to this commit, returning an empty mono from an exception handler
registered through ResponseSpec::onStatus would result in memory leaks
(since the response was not read) and in an empty response from bodyTo*
methods of the webclient.

As of this commit, that same empty mono is now interpreted to return
the body (and not an exception), offering a way to override the default
status handlers and return a normal response for 4xx and 5xx status
codes.
pull/23305/head
Arjen Poutsma 5 years ago
parent
commit
a9b3d95a14
  1. 107
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java
  2. 14
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java
  3. 44
      spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java

107
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

@ -415,9 +415,6 @@ class DefaultWebClient implements WebClient { @@ -415,9 +415,6 @@ class DefaultWebClient implements WebClient {
private static class DefaultResponseSpec implements ResponseSpec {
private static final StatusHandler DEFAULT_STATUS_HANDLER =
new StatusHandler(HttpStatus::isError, ClientResponse::createException);
private final Mono<ClientResponse> responseMono;
private final Supplier<HttpRequest> requestSupplier;
@ -427,72 +424,95 @@ class DefaultWebClient implements WebClient { @@ -427,72 +424,95 @@ class DefaultWebClient implements WebClient {
DefaultResponseSpec(Mono<ClientResponse> responseMono, Supplier<HttpRequest> requestSupplier) {
this.responseMono = responseMono;
this.requestSupplier = requestSupplier;
this.statusHandlers.add(DEFAULT_STATUS_HANDLER);
this.statusHandlers.add(new StatusHandler(HttpStatus::isError, ClientResponse::createException));
}
@Override
public ResponseSpec onStatus(Predicate<HttpStatus> statusPredicate,
Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction) {
if (this.statusHandlers.size() == 1 && this.statusHandlers.get(0) == DEFAULT_STATUS_HANDLER) {
this.statusHandlers.clear();
}
this.statusHandlers.add(new StatusHandler(statusPredicate, exceptionFunction));
Assert.notNull(statusPredicate, "StatusPredicate must not be null");
Assert.notNull(exceptionFunction, "Function must not be null");
this.statusHandlers.add(0, new StatusHandler(statusPredicate, exceptionFunction));
return this;
}
@Override
public <T> Mono<T> bodyToMono(Class<T> elementClass) {
return this.responseMono.flatMap(response -> handleBody(response,
response.bodyToMono(elementClass), mono -> mono.flatMap(Mono::error)));
Assert.notNull(elementClass, "ElementClass must not be null");
return this.responseMono.flatMap(response -> handleBodyMono(response, response.bodyToMono(elementClass)));
}
@Override
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> elementTypeRef) {
return this.responseMono.flatMap(response ->
handleBody(response, response.bodyToMono(elementTypeRef), mono -> mono.flatMap(Mono::error)));
Assert.notNull(elementTypeRef, "ElementTypeRef must not be null");
return this.responseMono.flatMap(response -> handleBodyMono(response, response.bodyToMono(elementTypeRef)));
}
private <T> Mono<T> handleBodyMono(ClientResponse response, Mono<T> bodyPublisher) {
if (HttpStatus.resolve(response.rawStatusCode()) != null) {
Mono<T> result = statusHandlers(response);
if (result != null) {
return result.switchIfEmpty(bodyPublisher);
}
else {
return bodyPublisher;
}
}
else {
return response.createException().flatMap(Mono::error);
}
}
@Override
public <T> Flux<T> bodyToFlux(Class<T> elementClass) {
return this.responseMono.flatMapMany(response ->
handleBody(response, response.bodyToFlux(elementClass), mono -> mono.handle((t, sink) -> sink.error(t))));
Assert.notNull(elementClass, "ElementClass must not be null");
return this.responseMono.flatMapMany(response -> handleBodyFlux(response, response.bodyToFlux(elementClass)));
}
@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> elementTypeRef) {
return this.responseMono.flatMapMany(response ->
handleBody(response, response.bodyToFlux(elementTypeRef), mono -> mono.handle((t, sink) -> sink.error(t))));
Assert.notNull(elementTypeRef, "ElementTypeRef must not be null");
return this.responseMono.flatMapMany(response -> handleBodyFlux(response, response.bodyToFlux(elementTypeRef)));
}
private <T extends Publisher<?>> T handleBody(ClientResponse response,
T bodyPublisher, Function<Mono<? extends Throwable>, T> errorFunction) {
private <T> Publisher<T> handleBodyFlux(ClientResponse response, Flux<T> bodyPublisher) {
if (HttpStatus.resolve(response.rawStatusCode()) != null) {
for (StatusHandler handler : this.statusHandlers) {
if (handler.test(response.statusCode())) {
Mono<? extends Throwable> exMono;
try {
exMono = handler.apply(response);
exMono = exMono.flatMap(ex -> drainBody(response, ex));
exMono = exMono.onErrorResume(ex -> drainBody(response, ex));
}
catch (Throwable ex2) {
exMono = drainBody(response, ex2);
}
T result = errorFunction.apply(exMono);
HttpRequest request = this.requestSupplier.get();
return insertCheckpoint(result, response.statusCode(), request);
}
Mono<T> result = statusHandlers(response);
if (result != null) {
return result.flux().switchIfEmpty(bodyPublisher);
}
else {
return bodyPublisher;
}
return bodyPublisher;
}
else {
return errorFunction.apply(response.createException());
return response.createException().flatMap(Mono::error);
}
}
@Nullable
private <T> Mono<T> statusHandlers(ClientResponse response) {
for (StatusHandler handler : this.statusHandlers) {
if (handler.test(response.statusCode())) {
Mono<? extends Throwable> exMono;
try {
exMono = handler.apply(response);
exMono = exMono.flatMap(ex -> drainBody(response, ex));
exMono = exMono.onErrorResume(ex -> drainBody(response, ex));
}
catch (Throwable ex2) {
exMono = drainBody(response, ex2);
}
Mono<T> result = exMono.flatMap(Mono::error);
HttpRequest request = this.requestSupplier.get();
return insertCheckpoint(result, response.statusCode(), request);
}
}
return null;
}
@SuppressWarnings("unchecked")
private <T> Mono<T> drainBody(ClientResponse response, Throwable ex) {
// Ensure the body is drained, even if the StatusHandler didn't consume it,
@ -501,20 +521,11 @@ class DefaultWebClient implements WebClient { @@ -501,20 +521,11 @@ class DefaultWebClient implements WebClient {
.onErrorResume(ex2 -> Mono.empty()).thenReturn(ex);
}
@SuppressWarnings("unchecked")
private <T extends Publisher<?>> T insertCheckpoint(T result, HttpStatus status, HttpRequest request) {
private <T> Mono<T> insertCheckpoint(Mono<T> result, HttpStatus status, HttpRequest request) {
String httpMethod = request.getMethodValue();
URI uri = request.getURI();
String description = status + " from " + httpMethod + " " + uri + " [DefaultWebClient]";
if (result instanceof Mono) {
return (T) ((Mono<?>) result).checkpoint(description);
}
else if (result instanceof Flux) {
return (T) ((Flux<?>) result).checkpoint(description);
}
else {
return result;
}
return result.checkpoint(description);
}
@ -527,8 +538,6 @@ class DefaultWebClient implements WebClient { @@ -527,8 +538,6 @@ class DefaultWebClient implements WebClient {
public StatusHandler(Predicate<HttpStatus> predicate,
Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction) {
Assert.notNull(predicate, "Predicate must not be null");
Assert.notNull(exceptionFunction, "Function must not be null");
this.predicate = predicate;
this.exceptionFunction = exceptionFunction;
}

14
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java

@ -671,17 +671,21 @@ public interface WebClient { @@ -671,17 +671,21 @@ public interface WebClient {
/**
* Register a custom error function that gets invoked when the given {@link HttpStatus}
* predicate applies. The exception returned from the function will be returned from
* {@link #bodyToMono(Class)} and {@link #bodyToFlux(Class)}.
* <p>By default, an error handler is register that throws a
* predicate applies. Whatever exception is returned from the function (possibly using
* {@link ClientResponse#createException()}) will also be returned as error signal
* from {@link #bodyToMono(Class)} and {@link #bodyToFlux(Class)}.
* <p>By default, an error handler is registered that returns a
* {@link WebClientResponseException} when the response status code is 4xx or 5xx.
* @param statusPredicate a predicate that indicates whether {@code exceptionFunction}
* applies
* To override this default (and return a non-error response from {@code bodyOn*}), register
* an exception function that returns an {@linkplain Mono#empty() empty} mono.
* <p><strong>NOTE:</strong> if the response is expected to have content,
* the exceptionFunction should consume it. If not, the content will be
* automatically drained to ensure resources are released.
* @param statusPredicate a predicate that indicates whether {@code exceptionFunction}
* applies
* @param exceptionFunction the function that returns the exception
* @return this builder
* @see ClientResponse#createException()
*/
ResponseSpec onStatus(Predicate<HttpStatus> statusPredicate,
Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction);

44
spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java

@ -626,6 +626,50 @@ public class WebClientIntegrationTests { @@ -626,6 +626,50 @@ public class WebClientIntegrationTests {
});
}
@Test
public void emptyStatusHandlerShouldReturnBody() {
prepareResponse(response -> response.setResponseCode(500)
.setHeader("Content-Type", "text/plain").setBody("Internal Server error"));
Mono<String> result = this.webClient.get()
.uri("/greeting?name=Spring")
.retrieve()
.onStatus(HttpStatus::is5xxServerError, response -> Mono.empty())
.bodyToMono(String.class);
StepVerifier.create(result)
.expectNext("Internal Server error")
.verifyComplete();
expectRequestCount(1);
expectRequest(request -> {
assertThat(request.getHeader(HttpHeaders.ACCEPT)).isEqualTo("*/*");
assertThat(request.getPath()).isEqualTo("/greeting?name=Spring");
});
}
@Test
public void emptyStatusHandlerShouldReturnBodyFlux() {
prepareResponse(response -> response.setResponseCode(500)
.setHeader("Content-Type", "text/plain").setBody("Internal Server error"));
Flux<String> result = this.webClient.get()
.uri("/greeting?name=Spring")
.retrieve()
.onStatus(HttpStatus::is5xxServerError, response -> Mono.empty())
.bodyToFlux(String.class);
StepVerifier.create(result)
.expectNext("Internal Server error")
.verifyComplete();
expectRequestCount(1);
expectRequest(request -> {
assertThat(request.getHeader(HttpHeaders.ACCEPT)).isEqualTo("*/*");
assertThat(request.getPath()).isEqualTo("/greeting?name=Spring");
});
}
@Test
public void shouldReceiveNotFoundEntity() {
prepareResponse(response -> response.setResponseCode(404)

Loading…
Cancel
Save