Browse Source

Avoid resume-suspend race condition

This commit turns suspendReading() into a readingPaused() notification
that is invoked after a succession of reads stops because there is no
more demand. Sub-classes can use this notification to suspend, if that
applies to them.

Most importantly the notification is guaranteed not to overlap with
checkOnDataAvailable() which means that suspend does not need to be
atomic and guarded against resume. The two can and do compete all the
time when reading ends with no demand, and a request for demand arrives
concurrently.

Issue: SPR-16207
pull/1605/head
Rossen Stoyanchev 7 years ago
parent
commit
afdca285e5
  1. 44
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java
  2. 2
      spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java
  3. 22
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java
  4. 2
      spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java
  5. 23
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java
  6. 8
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java
  7. 5
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java
  8. 5
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java
  9. 5
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

44
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

@ -116,9 +116,14 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -116,9 +116,14 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
protected abstract T read() throws IOException;
/**
* Suspend reading, if the underlying API provides such a mechanism.
* Invoked when reading is paused due to a lack of demand.
* <p><strong>Note:</strong> This method is guaranteed not to compete with
* {@link #checkOnDataAvailable()} so it can be used to safely suspend
* reading, if the underlying API supports it, i.e. without competing with
* an implicit call to resume via {@code checkOnDataAvailable()}.
* @since 5.0.2
*/
protected abstract void suspendReading();
protected abstract void readingPaused();
// Private methods for use in State...
@ -280,7 +285,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -280,7 +285,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
if (Operators.validate(n)) {
Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n);
// Did a concurrent read transition to NO_DEMAND just before us?
if (publisher.changeState(NO_DEMAND, DEMAND)) {
if (publisher.changeState(NO_DEMAND, this)) {
publisher.checkOnDataAvailable();
}
}
@ -288,23 +293,6 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -288,23 +293,6 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
@Override
<T> void onDataAvailable(AbstractListenerReadPublisher<T> publisher) {
for (;;) {
if (!read(publisher)) {
return;
}
// Maybe demand arrived between readAndPublish and READING->NO_DEMAND?
long r = publisher.demand;
if (r == 0 || publisher.changeState(NO_DEMAND, this)) {
break;
}
}
}
/**
* @return whether to exit the read loop; false means stop trying
* to read, true means check demand one more time.
*/
<T> boolean read(AbstractListenerReadPublisher<T> publisher) {
if (publisher.changeState(this, READING)) {
try {
boolean demandAvailable = publisher.readAndPublish();
@ -313,18 +301,22 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -313,18 +301,22 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
publisher.checkOnDataAvailable();
}
}
else if (publisher.changeState(READING, NO_DEMAND)) {
publisher.suspendReading();
return true;
else {
publisher.readingPaused();
if (publisher.changeState(READING, NO_DEMAND)) {
// Demand may have arrived since readAndPublish returned
long r = publisher.demand;
if (r > 0 && publisher.changeState(NO_DEMAND, this)) {
publisher.checkOnDataAvailable();
}
}
}
}
catch (IOException ex) {
publisher.onError(ex);
}
}
// Either competing onDataAvailable calls (via request or container callback)
// Or a concurrent completion
return false;
// Else, either competing onDataAvailable (request vs container), or concurrent completion
}
},

2
spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java

@ -267,7 +267,7 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest { @@ -267,7 +267,7 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest {
}
@Override
protected void suspendReading() {
protected void readingPaused() {
// no-op
}

22
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

@ -150,32 +150,16 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { @@ -150,32 +150,16 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
@Override
protected void checkOnDataAvailable() {
// TODO: The onDataAvailable() call below can cause a StackOverflowError
// since this method is being called from onDataAvailable() itself.
if (isReadPossible()) {
onDataAvailable();
}
}
private boolean isReadPossible() {
if (!this.channel.isReadResumed()) {
this.channel.resumeReads();
}
return this.channel.isReadResumed();
// We are allowed to try, it will return null if data is not available
onDataAvailable();
}
@Override
protected void suspendReading() {
protected void readingPaused() {
this.channel.suspendReads();
}
@Override
public void onAllDataRead() {
this.channel.getReadSetter().set(null);
this.channel.resumeReads();
super.onAllDataRead();
}
@Override
@Nullable
protected DataBuffer read() throws IOException {

2
spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java

@ -68,7 +68,7 @@ public class ListenerReadPublisherTests { @@ -68,7 +68,7 @@ public class ListenerReadPublisherTests {
}
@Override
protected void suspendReading() {
protected void readingPaused() {
// No-op
}

23
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

@ -149,17 +149,6 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc @@ -149,17 +149,6 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
*/
protected abstract void resumeReceiving();
/**
* Whether receiving new message(s) is suspended.
* <p><strong>Note:</strong> if the underlying WebSocket API does not provide
* flow control for receiving messages, then this method as well as
* {@link #canSuspendReceiving()} should both return {@code false}.
* @return returns {@code true} if receiving new message(s) is suspended,
* or otherwise {@code false}.
* @since 5.0.2
*/
protected abstract boolean isSuspended();
/**
* Send the given WebSocket message.
*/
@ -231,16 +220,14 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc @@ -231,16 +220,14 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
@Override
protected void checkOnDataAvailable() {
if (isSuspended()) {
resumeReceiving();
}
if (!this.pendingMessages.isEmpty()) {
onDataAvailable();
}
}
@Override
protected void suspendReading() {
protected void readingPaused() {
suspendReceiving();
}
@ -250,14 +237,6 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc @@ -250,14 +237,6 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
return (WebSocketMessage) this.pendingMessages.poll();
}
@Override
public void onAllDataRead() {
if (isSuspended()) {
resumeReceiving();
}
super.onAllDataRead();
}
void handleMessage(WebSocketMessage webSocketMessage) {
this.pendingMessages.offer(webSocketMessage);
onDataAvailable();

8
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java

@ -74,14 +74,10 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess @@ -74,14 +74,10 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
@Override
protected void resumeReceiving() {
SuspendToken tokenToUse = this.suspendToken;
Assert.state(tokenToUse != null, "Not suspended");
tokenToUse.resume();
this.suspendToken = null;
if (tokenToUse != null) {
tokenToUse.resume();
}
@Override
protected boolean isSuspended() {
return this.suspendToken != null;
}
@Override

5
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java

@ -71,11 +71,6 @@ public class StandardWebSocketSession extends AbstractListenerWebSocketSession<S @@ -71,11 +71,6 @@ public class StandardWebSocketSession extends AbstractListenerWebSocketSession<S
// no-op
}
@Override
protected boolean isSuspended() {
return false;
}
@Override
protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer();

5
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java

@ -71,9 +71,4 @@ public class TomcatWebSocketSession extends StandardWebSocketSession { @@ -71,9 +71,4 @@ public class TomcatWebSocketSession extends StandardWebSocketSession {
}
}
@Override
protected boolean isSuspended() {
return this.suspended == 1;
}
}

5
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

@ -71,11 +71,6 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W @@ -71,11 +71,6 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W
getDelegate().resumeReceives();
}
@Override
protected boolean isSuspended() {
return !getDelegate().isReceivesResumed();
}
@Override
protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer();

Loading…
Cancel
Save