From a9b3d95a14c316387f2b77328d6864b8e0a36721 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Fri, 12 Jul 2019 14:46:33 +0200 Subject: [PATCH] Interpret empty mono from status handler as normal response Prior to this commit, returning an empty mono from an exception handler registered through ResponseSpec::onStatus would result in memory leaks (since the response was not read) and in an empty response from bodyTo* methods of the webclient. As of this commit, that same empty mono is now interpreted to return the body (and not an exception), offering a way to override the default status handlers and return a normal response for 4xx and 5xx status codes. --- .../function/client/DefaultWebClient.java | 107 ++++++++++-------- .../reactive/function/client/WebClient.java | 14 ++- .../client/WebClientIntegrationTests.java | 44 +++++++ 3 files changed, 111 insertions(+), 54 deletions(-) diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index f40febfe1f..f73c09ebc0 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -415,9 +415,6 @@ class DefaultWebClient implements WebClient { private static class DefaultResponseSpec implements ResponseSpec { - private static final StatusHandler DEFAULT_STATUS_HANDLER = - new StatusHandler(HttpStatus::isError, ClientResponse::createException); - private final Mono responseMono; private final Supplier requestSupplier; @@ -427,72 +424,95 @@ class DefaultWebClient implements WebClient { DefaultResponseSpec(Mono responseMono, Supplier requestSupplier) { this.responseMono = responseMono; this.requestSupplier = requestSupplier; - this.statusHandlers.add(DEFAULT_STATUS_HANDLER); + this.statusHandlers.add(new StatusHandler(HttpStatus::isError, ClientResponse::createException)); } @Override public ResponseSpec onStatus(Predicate statusPredicate, Function> exceptionFunction) { - if (this.statusHandlers.size() == 1 && this.statusHandlers.get(0) == DEFAULT_STATUS_HANDLER) { - this.statusHandlers.clear(); - } - this.statusHandlers.add(new StatusHandler(statusPredicate, exceptionFunction)); + Assert.notNull(statusPredicate, "StatusPredicate must not be null"); + Assert.notNull(exceptionFunction, "Function must not be null"); + this.statusHandlers.add(0, new StatusHandler(statusPredicate, exceptionFunction)); return this; } @Override public Mono bodyToMono(Class elementClass) { - return this.responseMono.flatMap(response -> handleBody(response, - response.bodyToMono(elementClass), mono -> mono.flatMap(Mono::error))); + Assert.notNull(elementClass, "ElementClass must not be null"); + return this.responseMono.flatMap(response -> handleBodyMono(response, response.bodyToMono(elementClass))); } @Override public Mono bodyToMono(ParameterizedTypeReference elementTypeRef) { - return this.responseMono.flatMap(response -> - handleBody(response, response.bodyToMono(elementTypeRef), mono -> mono.flatMap(Mono::error))); + Assert.notNull(elementTypeRef, "ElementTypeRef must not be null"); + return this.responseMono.flatMap(response -> handleBodyMono(response, response.bodyToMono(elementTypeRef))); + } + + private Mono handleBodyMono(ClientResponse response, Mono bodyPublisher) { + if (HttpStatus.resolve(response.rawStatusCode()) != null) { + Mono result = statusHandlers(response); + if (result != null) { + return result.switchIfEmpty(bodyPublisher); + } + else { + return bodyPublisher; + } + } + else { + return response.createException().flatMap(Mono::error); + } } @Override public Flux bodyToFlux(Class elementClass) { - return this.responseMono.flatMapMany(response -> - handleBody(response, response.bodyToFlux(elementClass), mono -> mono.handle((t, sink) -> sink.error(t)))); + Assert.notNull(elementClass, "ElementClass must not be null"); + return this.responseMono.flatMapMany(response -> handleBodyFlux(response, response.bodyToFlux(elementClass))); } @Override public Flux bodyToFlux(ParameterizedTypeReference elementTypeRef) { - return this.responseMono.flatMapMany(response -> - handleBody(response, response.bodyToFlux(elementTypeRef), mono -> mono.handle((t, sink) -> sink.error(t)))); + Assert.notNull(elementTypeRef, "ElementTypeRef must not be null"); + return this.responseMono.flatMapMany(response -> handleBodyFlux(response, response.bodyToFlux(elementTypeRef))); } - private > T handleBody(ClientResponse response, - T bodyPublisher, Function, T> errorFunction) { - + private Publisher handleBodyFlux(ClientResponse response, Flux bodyPublisher) { if (HttpStatus.resolve(response.rawStatusCode()) != null) { - for (StatusHandler handler : this.statusHandlers) { - if (handler.test(response.statusCode())) { - Mono exMono; - try { - exMono = handler.apply(response); - exMono = exMono.flatMap(ex -> drainBody(response, ex)); - exMono = exMono.onErrorResume(ex -> drainBody(response, ex)); - } - catch (Throwable ex2) { - exMono = drainBody(response, ex2); - } - T result = errorFunction.apply(exMono); - HttpRequest request = this.requestSupplier.get(); - return insertCheckpoint(result, response.statusCode(), request); - } + Mono result = statusHandlers(response); + if (result != null) { + return result.flux().switchIfEmpty(bodyPublisher); + } + else { + return bodyPublisher; } - return bodyPublisher; } else { - return errorFunction.apply(response.createException()); + return response.createException().flatMap(Mono::error); } } + @Nullable + private Mono statusHandlers(ClientResponse response) { + for (StatusHandler handler : this.statusHandlers) { + if (handler.test(response.statusCode())) { + Mono exMono; + try { + exMono = handler.apply(response); + exMono = exMono.flatMap(ex -> drainBody(response, ex)); + exMono = exMono.onErrorResume(ex -> drainBody(response, ex)); + } + catch (Throwable ex2) { + exMono = drainBody(response, ex2); + } + Mono result = exMono.flatMap(Mono::error); + HttpRequest request = this.requestSupplier.get(); + return insertCheckpoint(result, response.statusCode(), request); + } + } + return null; + } + @SuppressWarnings("unchecked") private Mono drainBody(ClientResponse response, Throwable ex) { // Ensure the body is drained, even if the StatusHandler didn't consume it, @@ -501,20 +521,11 @@ class DefaultWebClient implements WebClient { .onErrorResume(ex2 -> Mono.empty()).thenReturn(ex); } - @SuppressWarnings("unchecked") - private > T insertCheckpoint(T result, HttpStatus status, HttpRequest request) { + private Mono insertCheckpoint(Mono result, HttpStatus status, HttpRequest request) { String httpMethod = request.getMethodValue(); URI uri = request.getURI(); String description = status + " from " + httpMethod + " " + uri + " [DefaultWebClient]"; - if (result instanceof Mono) { - return (T) ((Mono) result).checkpoint(description); - } - else if (result instanceof Flux) { - return (T) ((Flux) result).checkpoint(description); - } - else { - return result; - } + return result.checkpoint(description); } @@ -527,8 +538,6 @@ class DefaultWebClient implements WebClient { public StatusHandler(Predicate predicate, Function> exceptionFunction) { - Assert.notNull(predicate, "Predicate must not be null"); - Assert.notNull(exceptionFunction, "Function must not be null"); this.predicate = predicate; this.exceptionFunction = exceptionFunction; } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java index 70c21d3bdd..e6e1e1f032 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java @@ -671,17 +671,21 @@ public interface WebClient { /** * Register a custom error function that gets invoked when the given {@link HttpStatus} - * predicate applies. The exception returned from the function will be returned from - * {@link #bodyToMono(Class)} and {@link #bodyToFlux(Class)}. - *

By default, an error handler is register that throws a + * predicate applies. Whatever exception is returned from the function (possibly using + * {@link ClientResponse#createException()}) will also be returned as error signal + * from {@link #bodyToMono(Class)} and {@link #bodyToFlux(Class)}. + *

By default, an error handler is registered that returns a * {@link WebClientResponseException} when the response status code is 4xx or 5xx. - * @param statusPredicate a predicate that indicates whether {@code exceptionFunction} - * applies + * To override this default (and return a non-error response from {@code bodyOn*}), register + * an exception function that returns an {@linkplain Mono#empty() empty} mono. *

NOTE: if the response is expected to have content, * the exceptionFunction should consume it. If not, the content will be * automatically drained to ensure resources are released. + * @param statusPredicate a predicate that indicates whether {@code exceptionFunction} + * applies * @param exceptionFunction the function that returns the exception * @return this builder + * @see ClientResponse#createException() */ ResponseSpec onStatus(Predicate statusPredicate, Function> exceptionFunction); diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index 274c545418..e9755b743c 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -626,6 +626,50 @@ public class WebClientIntegrationTests { }); } + @Test + public void emptyStatusHandlerShouldReturnBody() { + prepareResponse(response -> response.setResponseCode(500) + .setHeader("Content-Type", "text/plain").setBody("Internal Server error")); + + Mono result = this.webClient.get() + .uri("/greeting?name=Spring") + .retrieve() + .onStatus(HttpStatus::is5xxServerError, response -> Mono.empty()) + .bodyToMono(String.class); + + StepVerifier.create(result) + .expectNext("Internal Server error") + .verifyComplete(); + + expectRequestCount(1); + expectRequest(request -> { + assertThat(request.getHeader(HttpHeaders.ACCEPT)).isEqualTo("*/*"); + assertThat(request.getPath()).isEqualTo("/greeting?name=Spring"); + }); + } + + @Test + public void emptyStatusHandlerShouldReturnBodyFlux() { + prepareResponse(response -> response.setResponseCode(500) + .setHeader("Content-Type", "text/plain").setBody("Internal Server error")); + + Flux result = this.webClient.get() + .uri("/greeting?name=Spring") + .retrieve() + .onStatus(HttpStatus::is5xxServerError, response -> Mono.empty()) + .bodyToFlux(String.class); + + StepVerifier.create(result) + .expectNext("Internal Server error") + .verifyComplete(); + + expectRequestCount(1); + expectRequest(request -> { + assertThat(request.getHeader(HttpHeaders.ACCEPT)).isEqualTo("*/*"); + assertThat(request.getPath()).isEqualTo("/greeting?name=Spring"); + }); + } + @Test public void shouldReceiveNotFoundEntity() { prepareResponse(response -> response.setResponseCode(404)