|
|
@ -200,7 +200,7 @@ public abstract class BodyExtractors { |
|
|
|
private static <T> Supplier<Flux<T>> skipBodyAsFlux(ReactiveHttpInputMessage message, |
|
|
|
private static <T> Supplier<Flux<T>> skipBodyAsFlux(ReactiveHttpInputMessage message, |
|
|
|
BodyExtractor.Context context) { |
|
|
|
BodyExtractor.Context context) { |
|
|
|
|
|
|
|
|
|
|
|
if (isExtractingForClient(message, context)) { |
|
|
|
if (isExtractingForClient(message)) { |
|
|
|
return () -> consumeAndCancel(message).thenMany(Flux.empty()); |
|
|
|
return () -> consumeAndCancel(message).thenMany(Flux.empty()); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
@ -211,7 +211,7 @@ public abstract class BodyExtractors { |
|
|
|
private static <T> Supplier<Mono<T>> skipBodyAsMono(ReactiveHttpInputMessage message, |
|
|
|
private static <T> Supplier<Mono<T>> skipBodyAsMono(ReactiveHttpInputMessage message, |
|
|
|
BodyExtractor.Context context) { |
|
|
|
BodyExtractor.Context context) { |
|
|
|
|
|
|
|
|
|
|
|
if (isExtractingForClient(message, context)) { |
|
|
|
if (isExtractingForClient(message)) { |
|
|
|
return () -> consumeAndCancel(message).then(Mono.empty()); |
|
|
|
return () -> consumeAndCancel(message).then(Mono.empty()); |
|
|
|
} |
|
|
|
} |
|
|
|
else { |
|
|
|
else { |
|
|
@ -259,7 +259,7 @@ public abstract class BodyExtractors { |
|
|
|
else { |
|
|
|
else { |
|
|
|
result = Flux.error(ex); |
|
|
|
result = Flux.error(ex); |
|
|
|
} |
|
|
|
} |
|
|
|
return isExtractingForClient(inputMessage, context) ? |
|
|
|
return isExtractingForClient(inputMessage) ? |
|
|
|
consumeAndCancel(inputMessage).thenMany(result) : result; |
|
|
|
consumeAndCancel(inputMessage).thenMany(result) : result; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -279,10 +279,8 @@ public abstract class BodyExtractors { |
|
|
|
return (HttpMessageReader<T>) reader; |
|
|
|
return (HttpMessageReader<T>) reader; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private static boolean isExtractingForClient(ReactiveHttpInputMessage message, |
|
|
|
private static boolean isExtractingForClient(ReactiveHttpInputMessage message) { |
|
|
|
BodyExtractor.Context context) { |
|
|
|
return message instanceof ClientHttpResponse; |
|
|
|
return !context.serverResponse().isPresent() |
|
|
|
|
|
|
|
&& message instanceof ClientHttpResponse; |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
private static Mono<Void> consumeAndCancel(ReactiveHttpInputMessage message) { |
|
|
|
private static Mono<Void> consumeAndCancel(ReactiveHttpInputMessage message) { |
|
|
|