Browse Source

Refine check for multiple subscribers

Commit #c187cb2 introduced proactive rejection of multiple subscribers
in ReactorClientHttpResponse, instead of hanging indefinitely as per
https://github.com/reactor/reactor-netty/issues/503.

However FluxReceive also rejects subsequent subscribers if the response
is consumed fully, as opposed to being canceled, e.g. as with
bodyToMono(Void.class). In that case, a subsequent subscriber causes
two competing error signals to be sent, and one gets dropped and
logged by reactor-core.

This fix ensures that a rejection is raised in
ReactorClientHttpResponse only after a cancel() was detected.

Issue: SPR-17564
pull/2040/head
Rossen Stoyanchev 6 years ago
parent
commit
7a5f8e03bc
  1. 19
      spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java
  2. 5
      spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

19
spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java

@ -29,7 +29,6 @@ import org.springframework.core.io.buffer.NettyDataBufferFactory; @@ -29,7 +29,6 @@ import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseCookie;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
@ -49,7 +48,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse { @@ -49,7 +48,7 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
private final NettyInbound inbound;
private final AtomicBoolean bodyConsumed = new AtomicBoolean();
private final AtomicBoolean rejectSubscribers = new AtomicBoolean();
public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) {
@ -62,10 +61,18 @@ class ReactorClientHttpResponse implements ClientHttpResponse { @@ -62,10 +61,18 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
@Override
public Flux<DataBuffer> getBody() {
return this.inbound.receive()
.doOnSubscribe(s ->
// See https://github.com/reactor/reactor-netty/issues/503
Assert.state(this.bodyConsumed.compareAndSet(false, true),
"The client response body can only be consumed once."))
.doOnSubscribe(s -> {
if (this.rejectSubscribers.get()) {
throw new IllegalStateException("The client response body can only be consumed once.");
}
})
.doOnCancel(() -> {
// https://github.com/reactor/reactor-netty/issues/503
// FluxReceive rejects multiple subscribers, but not after a cancel().
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
// So we need to intercept and reject them in that case.
this.rejectSubscribers.set(true);
})
.map(byteBuf -> {
byteBuf.retain();
return this.bufferFactory.wrap(byteBuf);

5
spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

@ -447,8 +447,9 @@ class DefaultWebClient implements WebClient { @@ -447,8 +447,9 @@ class DefaultWebClient implements WebClient {
@SuppressWarnings("unchecked")
private <T> Mono<T> drainBody(ClientResponse response, Throwable ex) {
// Ensure the body is drained, even if the StatusHandler didn't consume it,
// but ignore errors in case it did consume it.
return (Mono<T>) response.bodyToMono(Void.class).onErrorMap(ex2 -> ex).thenReturn(ex);
// but ignore exception, in case the handler did consume.
return (Mono<T>) response.bodyToMono(Void.class)
.onErrorResume(ex2 -> Mono.empty()).thenReturn(ex);
}
private static Mono<WebClientResponseException> createResponseException(ClientResponse response) {

Loading…
Cancel
Save