Browse Source

Fix race condition with onCompletion/onError

Closes gh-23096
pull/23837/head
Rossen Stoyanchev 5 years ago
parent
commit
dd22b8fd39
  1. 35
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java
  2. 32
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java
  3. 10
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

35
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -283,19 +283,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -283,19 +283,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
publisher.subscriber = subscriber;
subscriber.onSubscribe(subscription);
publisher.changeState(SUBSCRIBING, NO_DEMAND);
// Now safe to check "beforeDemand" flags, they won't change once in NO_DEMAND
String logPrefix = publisher.getLogPrefix();
if (publisher.completionBeforeDemand) {
rsReadLogger.trace(logPrefix + "Completed before demand");
publisher.state.get().onAllDataRead(publisher);
}
Throwable ex = publisher.errorBeforeDemand;
if (ex != null) {
if (rsReadLogger.isTraceEnabled()) {
rsReadLogger.trace(logPrefix + "Completed with error before demand: " + ex);
}
publisher.state.get().onError(publisher, ex);
}
handleCompletionOrErrorBeforeDemand(publisher);
}
else {
throw new IllegalStateException("Failed to transition to SUBSCRIBING, " +
@ -306,11 +294,30 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -306,11 +294,30 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Override
<T> void onAllDataRead(AbstractListenerReadPublisher<T> publisher) {
publisher.completionBeforeDemand = true;
handleCompletionOrErrorBeforeDemand(publisher);
}
@Override
<T> void onError(AbstractListenerReadPublisher<T> publisher, Throwable ex) {
publisher.errorBeforeDemand = ex;
handleCompletionOrErrorBeforeDemand(publisher);
}
private <T> void handleCompletionOrErrorBeforeDemand(AbstractListenerReadPublisher<T> publisher) {
if (publisher.state.get().equals(NO_DEMAND)) {
if (publisher.completionBeforeDemand) {
rsReadLogger.trace(publisher.getLogPrefix() + "Completed before demand");
publisher.state.get().onAllDataRead(publisher);
}
Throwable ex = publisher.errorBeforeDemand;
if (ex != null) {
if (rsReadLogger.isTraceEnabled()) {
String prefix = publisher.getLogPrefix();
rsReadLogger.trace(prefix + "Completed with error before demand: " + ex);
}
publisher.state.get().onError(publisher, ex);
}
}
}
},

32
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -282,17 +282,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -282,17 +282,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
}
if (processor.changeState(this, REQUESTED)) {
if (processor.subscriberCompleted) {
if (processor.isFlushPending()) {
// Ensure the final flush
processor.changeState(REQUESTED, FLUSHING);
processor.flushIfPossible();
}
else if (processor.changeState(REQUESTED, COMPLETED)) {
processor.resultPublisher.publishComplete();
}
else {
processor.state.get().onComplete(processor);
}
handleSubscriberCompleted(processor);
}
else {
Assert.state(processor.subscription != null, "No subscription");
@ -303,6 +293,24 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo @@ -303,6 +293,24 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
@Override
public <T> void onComplete(AbstractListenerWriteFlushProcessor<T> processor) {
processor.subscriberCompleted = true;
// A competing write might have completed very quickly
if (processor.state.get().equals(State.REQUESTED)) {
handleSubscriberCompleted(processor);
}
}
private <T> void handleSubscriberCompleted(AbstractListenerWriteFlushProcessor<T> processor) {
if (processor.isFlushPending()) {
// Ensure the final flush
processor.changeState(State.REQUESTED, State.FLUSHING);
processor.flushIfPossible();
}
else if (processor.changeState(State.REQUESTED, State.COMPLETED)) {
processor.resultPublisher.publishComplete();
}
else {
processor.state.get().onComplete(processor);
}
}
},

10
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -376,6 +376,10 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -376,6 +376,10 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
@Override
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
processor.subscriberCompleted = true;
// A competing write might have completed very quickly
if (processor.state.get().equals(State.REQUESTED)) {
processor.changeStateToComplete(State.REQUESTED);
}
}
},
@ -383,6 +387,10 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -383,6 +387,10 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
@Override
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
processor.subscriberCompleted = true;
// A competing write might have completed very quickly
if (processor.state.get().equals(State.REQUESTED)) {
processor.changeStateToComplete(State.REQUESTED);
}
}
},

Loading…
Cancel
Save