Browse Source

Polish

pull/2014/head
Rossen Stoyanchev 6 years ago
parent
commit
f5da737bd4
  1. 71
      spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java
  2. 43
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

71
spring-webflux/src/main/java/org/springframework/web/reactive/function/BodyExtractors.java

@ -84,8 +84,8 @@ public abstract class BodyExtractors { @@ -84,8 +84,8 @@ public abstract class BodyExtractors {
return (inputMessage, context) ->
readWithMessageReaders(inputMessage, context, elementType,
(HttpMessageReader<T> reader) -> readToMono(inputMessage, context, elementType, reader),
ex -> Mono.from(unsupportedErrorHandler(inputMessage, context, ex)),
skipBodyAsMono(inputMessage, context));
ex -> Mono.from(unsupportedErrorHandler(inputMessage, ex)),
skipBodyAsMono(inputMessage));
}
/**
@ -113,8 +113,8 @@ public abstract class BodyExtractors { @@ -113,8 +113,8 @@ public abstract class BodyExtractors {
return (inputMessage, context) ->
readWithMessageReaders(inputMessage, context, elementType,
(HttpMessageReader<T> reader) -> readToFlux(inputMessage, context, elementType, reader),
ex -> unsupportedErrorHandler(inputMessage, context, ex),
skipBodyAsFlux(inputMessage, context));
ex -> unsupportedErrorHandler(inputMessage, ex),
skipBodyAsFlux(inputMessage));
}
@ -194,39 +194,13 @@ public abstract class BodyExtractors { @@ -194,39 +194,13 @@ public abstract class BodyExtractors {
.findFirst()
.map(BodyExtractors::<T>cast)
.map(readerFunction)
.orElseGet(() -> errorFunction.apply(unsupportedError(context, elementType, contentType)));
}
private static <T> Supplier<Flux<T>> skipBodyAsFlux(ReactiveHttpInputMessage message,
BodyExtractor.Context context) {
if (isExtractingForClient(message)) {
return () -> consumeAndCancel(message).thenMany(Flux.empty());
}
else {
return Flux::empty;
}
}
private static <T> Supplier<Mono<T>> skipBodyAsMono(ReactiveHttpInputMessage message,
BodyExtractor.Context context) {
if (isExtractingForClient(message)) {
return () -> consumeAndCancel(message).then(Mono.empty());
}
else {
return Mono::empty;
}
}
private static UnsupportedMediaTypeException unsupportedError(BodyExtractor.Context context,
ResolvableType elementType, MediaType contentType) {
List<MediaType> supportedMediaTypes = context.messageReaders().stream()
.orElseGet(() -> {
List<MediaType> mediaTypes = context.messageReaders().stream()
.flatMap(reader -> reader.getReadableMediaTypes().stream())
.collect(Collectors.toList());
return new UnsupportedMediaTypeException(contentType, supportedMediaTypes, elementType);
return errorFunction.apply(
new UnsupportedMediaTypeException(contentType, mediaTypes, elementType));
});
}
private static <T> Mono<T> readToMono(ReactiveHttpInputMessage message, BodyExtractor.Context context,
@ -246,21 +220,22 @@ public abstract class BodyExtractors { @@ -246,21 +220,22 @@ public abstract class BodyExtractors {
}
private static <T> Flux<T> unsupportedErrorHandler(
ReactiveHttpInputMessage inputMessage, BodyExtractor.Context context,
UnsupportedMediaTypeException ex) {
ReactiveHttpInputMessage message, UnsupportedMediaTypeException ex) {
Flux<T> result;
if (inputMessage.getHeaders().getContentType() == null) {
// Empty body with no content type is ok
result = inputMessage.getBody().map(o -> {
if (message.getHeaders().getContentType() == null) {
// Maybe it's okay, if there is no content..
result = message.getBody().map(o -> {
throw ex;
});
}
else {
result = Flux.error(ex);
}
return isExtractingForClient(inputMessage) ?
consumeAndCancel(inputMessage).thenMany(result) : result;
if (message instanceof ClientHttpResponse) {
result = consumeAndCancel(message).thenMany(result);
}
return result;
}
private static <T> HttpMessageReader<T> findReader(
@ -279,8 +254,15 @@ public abstract class BodyExtractors { @@ -279,8 +254,15 @@ public abstract class BodyExtractors {
return (HttpMessageReader<T>) reader;
}
private static boolean isExtractingForClient(ReactiveHttpInputMessage message) {
return message instanceof ClientHttpResponse;
private static <T> Supplier<Flux<T>> skipBodyAsFlux(ReactiveHttpInputMessage message) {
return message instanceof ClientHttpResponse ?
() -> consumeAndCancel(message).thenMany(Mono.empty()) : Flux::empty;
}
@SuppressWarnings("unchecked")
private static <T> Supplier<Mono<T>> skipBodyAsMono(ReactiveHttpInputMessage message) {
return message instanceof ClientHttpResponse ?
() -> consumeAndCancel(message).then(Mono.empty()) : Mono::empty;
}
private static Mono<Void> consumeAndCancel(ReactiveHttpInputMessage message) {
@ -296,4 +278,5 @@ public abstract class BodyExtractors { @@ -296,4 +278,5 @@ public abstract class BodyExtractors {
@SuppressWarnings("serial")
private static class ReadCancellationException extends RuntimeException {
}
}

43
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

@ -378,11 +378,14 @@ class DefaultWebClient implements WebClient { @@ -378,11 +378,14 @@ class DefaultWebClient implements WebClient {
private final List<StatusHandler> statusHandlers = new ArrayList<>(1);
DefaultResponseSpec(Mono<ClientResponse> responseMono) {
this.responseMono = responseMono;
this.statusHandlers.add(DEFAULT_STATUS_HANDLER);
}
@Override
public ResponseSpec onStatus(Predicate<HttpStatus> statusPredicate,
Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction) {
@ -396,45 +399,33 @@ class DefaultWebClient implements WebClient { @@ -396,45 +399,33 @@ class DefaultWebClient implements WebClient {
}
@Override
@SuppressWarnings("unchecked")
public <T> Mono<T> bodyToMono(Class<T> bodyType) {
// Use bodyToMono (vs BodyExtractors) to ensure proper handling of Void.class...
return this.responseMono.flatMap(
response -> bodyToPublisher(response, response.bodyToMono(bodyType),
this::monoThrowableToMono));
return this.responseMono.flatMap(response -> handleBody(response,
response.bodyToMono(bodyType), mono -> mono.flatMap(Mono::error)));
}
@Override
@SuppressWarnings("unchecked")
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> typeReference) {
return this.responseMono.flatMap(
response -> bodyToPublisher(response, response.bodyToMono(typeReference),
this::monoThrowableToMono));
}
private <T> Mono<T> monoThrowableToMono(Mono<? extends Throwable> mono) {
return mono.flatMap(Mono::error);
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> bodyType) {
return this.responseMono.flatMap(response ->
handleBody(response, response.bodyToMono(bodyType), mono -> mono.flatMap(Mono::error)));
}
@Override
@SuppressWarnings("unchecked")
public <T> Flux<T> bodyToFlux(Class<T> elementType) {
return this.responseMono.flatMapMany(
response -> bodyToPublisher(response, response.bodyToFlux(elementType),
this::monoThrowableToFlux));
return this.responseMono.flatMapMany(response ->
handleBody(response, response.bodyToFlux(elementType), mono -> mono.flatMapMany(Flux::error)));
}
@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> typeReference) {
return this.responseMono.flatMapMany(
response -> bodyToPublisher(response, response.bodyToFlux(typeReference),
this::monoThrowableToFlux));
}
private <T> Flux<T> monoThrowableToFlux(Mono<? extends Throwable> mono) {
return mono.flatMapMany(Flux::error);
@SuppressWarnings("unchecked")
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> elementType) {
return this.responseMono.flatMapMany(response -> handleBody(response,
response.bodyToFlux(elementType), mono -> mono.flatMapMany(Flux::error)));
}
private <T extends Publisher<?>> T bodyToPublisher(ClientResponse response,
private <T extends Publisher<?>> T handleBody(ClientResponse response,
T bodyPublisher, Function<Mono<? extends Throwable>, T> errorFunction) {
if (HttpStatus.resolve(response.rawStatusCode()) != null) {
@ -498,6 +489,7 @@ class DefaultWebClient implements WebClient { @@ -498,6 +489,7 @@ class DefaultWebClient implements WebClient {
private final Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction;
public StatusHandler(Predicate<HttpStatus> predicate,
Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction) {
@ -507,6 +499,7 @@ class DefaultWebClient implements WebClient { @@ -507,6 +499,7 @@ class DefaultWebClient implements WebClient {
this.exceptionFunction = exceptionFunction;
}
public boolean test(HttpStatus status) {
return this.predicate.test(status);
}

Loading…
Cancel
Save