From afdca285e58f3f618c01c772a95434a17b020fed Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 22 Nov 2017 15:27:01 -0500 Subject: [PATCH] Avoid resume-suspend race condition This commit turns suspendReading() into a readingPaused() notification that is invoked after a succession of reads stops because there is no more demand. Sub-classes can use this notification to suspend, if that applies to them. Most importantly the notification is guaranteed not to overlap with checkOnDataAvailable() which means that suspend does not need to be atomic and guarded against resume. The two can and do compete all the time when reading ends with no demand, and a request for demand arrives concurrently. Issue: SPR-16207 --- .../AbstractListenerReadPublisher.java | 44 ++++++++----------- .../reactive/ServletServerHttpRequest.java | 2 +- .../reactive/UndertowServerHttpRequest.java | 24 ++-------- .../reactive/ListenerReadPublisherTests.java | 2 +- .../AbstractListenerWebSocketSession.java | 25 +---------- .../socket/adapter/JettyWebSocketSession.java | 10 ++--- .../adapter/StandardWebSocketSession.java | 5 --- .../adapter/TomcatWebSocketSession.java | 5 --- .../adapter/UndertowWebSocketSession.java | 5 --- 9 files changed, 29 insertions(+), 93 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 5f2015c507..55b6ac9a5d 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -116,9 +116,14 @@ public abstract class AbstractListenerReadPublisher implements Publisher { protected abstract T read() throws IOException; /** - * Suspend reading, if the underlying API provides such a mechanism. + * Invoked when reading is paused due to a lack of demand. + *

Note: This method is guaranteed not to compete with + * {@link #checkOnDataAvailable()} so it can be used to safely suspend + * reading, if the underlying API supports it, i.e. without competing with + * an implicit call to resume via {@code checkOnDataAvailable()}. + * @since 5.0.2 */ - protected abstract void suspendReading(); + protected abstract void readingPaused(); // Private methods for use in State... @@ -280,7 +285,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { if (Operators.validate(n)) { Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); // Did a concurrent read transition to NO_DEMAND just before us? - if (publisher.changeState(NO_DEMAND, DEMAND)) { + if (publisher.changeState(NO_DEMAND, this)) { publisher.checkOnDataAvailable(); } } @@ -288,23 +293,6 @@ public abstract class AbstractListenerReadPublisher implements Publisher { @Override void onDataAvailable(AbstractListenerReadPublisher publisher) { - for (;;) { - if (!read(publisher)) { - return; - } - // Maybe demand arrived between readAndPublish and READING->NO_DEMAND? - long r = publisher.demand; - if (r == 0 || publisher.changeState(NO_DEMAND, this)) { - break; - } - } - } - - /** - * @return whether to exit the read loop; false means stop trying - * to read, true means check demand one more time. - */ - boolean read(AbstractListenerReadPublisher publisher) { if (publisher.changeState(this, READING)) { try { boolean demandAvailable = publisher.readAndPublish(); @@ -313,18 +301,22 @@ public abstract class AbstractListenerReadPublisher implements Publisher { publisher.checkOnDataAvailable(); } } - else if (publisher.changeState(READING, NO_DEMAND)) { - publisher.suspendReading(); - return true; + else { + publisher.readingPaused(); + if (publisher.changeState(READING, NO_DEMAND)) { + // Demand may have arrived since readAndPublish returned + long r = publisher.demand; + if (r > 0 && publisher.changeState(NO_DEMAND, this)) { + publisher.checkOnDataAvailable(); + } + } } } catch (IOException ex) { publisher.onError(ex); } } - // Either competing onDataAvailable calls (via request or container callback) - // Or a concurrent completion - return false; + // Else, either competing onDataAvailable (request vs container), or concurrent completion } }, diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java index 6ae7a25bc5..60a7f997e6 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -267,7 +267,7 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest { } @Override - protected void suspendReading() { + protected void readingPaused() { // no-op } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index b14d467ea6..fc22e01086 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -150,32 +150,16 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { @Override protected void checkOnDataAvailable() { - // TODO: The onDataAvailable() call below can cause a StackOverflowError - // since this method is being called from onDataAvailable() itself. - if (isReadPossible()) { - onDataAvailable(); - } - } - - private boolean isReadPossible() { - if (!this.channel.isReadResumed()) { - this.channel.resumeReads(); - } - return this.channel.isReadResumed(); + this.channel.resumeReads(); + // We are allowed to try, it will return null if data is not available + onDataAvailable(); } @Override - protected void suspendReading() { + protected void readingPaused() { this.channel.suspendReads(); } - @Override - public void onAllDataRead() { - this.channel.getReadSetter().set(null); - this.channel.resumeReads(); - super.onAllDataRead(); - } - @Override @Nullable protected DataBuffer read() throws IOException { diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java index 66ed319bd0..81260eb63f 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java @@ -68,7 +68,7 @@ public class ListenerReadPublisherTests { } @Override - protected void suspendReading() { + protected void readingPaused() { // No-op } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index deca209e03..3cd34f9e96 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -149,17 +149,6 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc */ protected abstract void resumeReceiving(); - /** - * Whether receiving new message(s) is suspended. - *

Note: if the underlying WebSocket API does not provide - * flow control for receiving messages, then this method as well as - * {@link #canSuspendReceiving()} should both return {@code false}. - * @return returns {@code true} if receiving new message(s) is suspended, - * or otherwise {@code false}. - * @since 5.0.2 - */ - protected abstract boolean isSuspended(); - /** * Send the given WebSocket message. */ @@ -231,16 +220,14 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc @Override protected void checkOnDataAvailable() { - if (isSuspended()) { - resumeReceiving(); - } + resumeReceiving(); if (!this.pendingMessages.isEmpty()) { onDataAvailable(); } } @Override - protected void suspendReading() { + protected void readingPaused() { suspendReceiving(); } @@ -250,14 +237,6 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc return (WebSocketMessage) this.pendingMessages.poll(); } - @Override - public void onAllDataRead() { - if (isSuspended()) { - resumeReceiving(); - } - super.onAllDataRead(); - } - void handleMessage(WebSocketMessage webSocketMessage) { this.pendingMessages.offer(webSocketMessage); onDataAvailable(); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index feb37108fc..449394b781 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -74,14 +74,10 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession