diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java index afeaf09628..b20cce8eaf 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java @@ -84,8 +84,8 @@ public abstract class BodyExtractors { return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, elementType, (HttpMessageReader reader) -> readToMono(inputMessage, context, elementType, reader), - ex -> Mono.from(unsupportedErrorHandler(inputMessage, context, ex)), - skipBodyAsMono(inputMessage, context)); + ex -> Mono.from(unsupportedErrorHandler(inputMessage, ex)), + skipBodyAsMono(inputMessage)); } /** @@ -113,8 +113,8 @@ public abstract class BodyExtractors { return (inputMessage, context) -> readWithMessageReaders(inputMessage, context, elementType, (HttpMessageReader reader) -> readToFlux(inputMessage, context, elementType, reader), - ex -> unsupportedErrorHandler(inputMessage, context, ex), - skipBodyAsFlux(inputMessage, context)); + ex -> unsupportedErrorHandler(inputMessage, ex), + skipBodyAsFlux(inputMessage)); } @@ -194,39 +194,13 @@ public abstract class BodyExtractors { .findFirst() .map(BodyExtractors::cast) .map(readerFunction) - .orElseGet(() -> errorFunction.apply(unsupportedError(context, elementType, contentType))); - } - - private static Supplier> skipBodyAsFlux(ReactiveHttpInputMessage message, - BodyExtractor.Context context) { - - if (isExtractingForClient(message)) { - return () -> consumeAndCancel(message).thenMany(Flux.empty()); - } - else { - return Flux::empty; - } - } - - private static Supplier> skipBodyAsMono(ReactiveHttpInputMessage message, - BodyExtractor.Context context) { - - if (isExtractingForClient(message)) { - return () -> consumeAndCancel(message).then(Mono.empty()); - } - else { - return Mono::empty; - } - } - - private static UnsupportedMediaTypeException unsupportedError(BodyExtractor.Context context, - ResolvableType elementType, MediaType contentType) { - - List supportedMediaTypes = context.messageReaders().stream() - .flatMap(reader -> reader.getReadableMediaTypes().stream()) - .collect(Collectors.toList()); - - return new UnsupportedMediaTypeException(contentType, supportedMediaTypes, elementType); + .orElseGet(() -> { + List mediaTypes = context.messageReaders().stream() + .flatMap(reader -> reader.getReadableMediaTypes().stream()) + .collect(Collectors.toList()); + return errorFunction.apply( + new UnsupportedMediaTypeException(contentType, mediaTypes, elementType)); + }); } private static Mono readToMono(ReactiveHttpInputMessage message, BodyExtractor.Context context, @@ -246,21 +220,22 @@ public abstract class BodyExtractors { } private static Flux unsupportedErrorHandler( - ReactiveHttpInputMessage inputMessage, BodyExtractor.Context context, - UnsupportedMediaTypeException ex) { + ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) { Flux result; - if (inputMessage.getHeaders().getContentType() == null) { - // Empty body with no content type is ok - result = inputMessage.getBody().map(o -> { + if (message.getHeaders().getContentType() == null) { + // Maybe it's okay, if there is no content.. + result = message.getBody().map(o -> { throw ex; }); } else { result = Flux.error(ex); } - return isExtractingForClient(inputMessage) ? - consumeAndCancel(inputMessage).thenMany(result) : result; + if (message instanceof ClientHttpResponse) { + result = consumeAndCancel(message).thenMany(result); + } + return result; } private static HttpMessageReader findReader( @@ -279,8 +254,15 @@ public abstract class BodyExtractors { return (HttpMessageReader) reader; } - private static boolean isExtractingForClient(ReactiveHttpInputMessage message) { - return message instanceof ClientHttpResponse; + private static Supplier> skipBodyAsFlux(ReactiveHttpInputMessage message) { + return message instanceof ClientHttpResponse ? + () -> consumeAndCancel(message).thenMany(Mono.empty()) : Flux::empty; + } + + @SuppressWarnings("unchecked") + private static Supplier> skipBodyAsMono(ReactiveHttpInputMessage message) { + return message instanceof ClientHttpResponse ? + () -> consumeAndCancel(message).then(Mono.empty()) : Mono::empty; } private static Mono consumeAndCancel(ReactiveHttpInputMessage message) { @@ -296,4 +278,5 @@ public abstract class BodyExtractors { @SuppressWarnings("serial") private static class ReadCancellationException extends RuntimeException { } + } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java index b858e739a7..e66398fdfd 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java @@ -378,11 +378,14 @@ class DefaultWebClient implements WebClient { private final List statusHandlers = new ArrayList<>(1); + DefaultResponseSpec(Mono responseMono) { this.responseMono = responseMono; this.statusHandlers.add(DEFAULT_STATUS_HANDLER); } + + @Override public ResponseSpec onStatus(Predicate statusPredicate, Function> exceptionFunction) { @@ -396,45 +399,33 @@ class DefaultWebClient implements WebClient { } @Override - @SuppressWarnings("unchecked") public Mono bodyToMono(Class bodyType) { - // Use bodyToMono (vs BodyExtractors) to ensure proper handling of Void.class... - return this.responseMono.flatMap( - response -> bodyToPublisher(response, response.bodyToMono(bodyType), - this::monoThrowableToMono)); + return this.responseMono.flatMap(response -> handleBody(response, + response.bodyToMono(bodyType), mono -> mono.flatMap(Mono::error))); } @Override @SuppressWarnings("unchecked") - public Mono bodyToMono(ParameterizedTypeReference typeReference) { - return this.responseMono.flatMap( - response -> bodyToPublisher(response, response.bodyToMono(typeReference), - this::monoThrowableToMono)); - } - - private Mono monoThrowableToMono(Mono mono) { - return mono.flatMap(Mono::error); + public Mono bodyToMono(ParameterizedTypeReference bodyType) { + return this.responseMono.flatMap(response -> + handleBody(response, response.bodyToMono(bodyType), mono -> mono.flatMap(Mono::error))); } @Override + @SuppressWarnings("unchecked") public Flux bodyToFlux(Class elementType) { - return this.responseMono.flatMapMany( - response -> bodyToPublisher(response, response.bodyToFlux(elementType), - this::monoThrowableToFlux)); + return this.responseMono.flatMapMany(response -> + handleBody(response, response.bodyToFlux(elementType), mono -> mono.flatMapMany(Flux::error))); } @Override - public Flux bodyToFlux(ParameterizedTypeReference typeReference) { - return this.responseMono.flatMapMany( - response -> bodyToPublisher(response, response.bodyToFlux(typeReference), - this::monoThrowableToFlux)); - } - - private Flux monoThrowableToFlux(Mono mono) { - return mono.flatMapMany(Flux::error); + @SuppressWarnings("unchecked") + public Flux bodyToFlux(ParameterizedTypeReference elementType) { + return this.responseMono.flatMapMany(response -> handleBody(response, + response.bodyToFlux(elementType), mono -> mono.flatMapMany(Flux::error))); } - private > T bodyToPublisher(ClientResponse response, + private > T handleBody(ClientResponse response, T bodyPublisher, Function, T> errorFunction) { if (HttpStatus.resolve(response.rawStatusCode()) != null) { @@ -498,6 +489,7 @@ class DefaultWebClient implements WebClient { private final Function> exceptionFunction; + public StatusHandler(Predicate predicate, Function> exceptionFunction) { @@ -507,6 +499,7 @@ class DefaultWebClient implements WebClient { this.exceptionFunction = exceptionFunction; } + public boolean test(HttpStatus status) { return this.predicate.test(status); }