diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 5f2015c507..55b6ac9a5d 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -116,9 +116,14 @@ public abstract class AbstractListenerReadPublisher implements Publisher { protected abstract T read() throws IOException; /** - * Suspend reading, if the underlying API provides such a mechanism. + * Invoked when reading is paused due to a lack of demand. + *

Note: This method is guaranteed not to compete with + * {@link #checkOnDataAvailable()} so it can be used to safely suspend + * reading, if the underlying API supports it, i.e. without competing with + * an implicit call to resume via {@code checkOnDataAvailable()}. + * @since 5.0.2 */ - protected abstract void suspendReading(); + protected abstract void readingPaused(); // Private methods for use in State... @@ -280,7 +285,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { if (Operators.validate(n)) { Operators.addCap(DEMAND_FIELD_UPDATER, publisher, n); // Did a concurrent read transition to NO_DEMAND just before us? - if (publisher.changeState(NO_DEMAND, DEMAND)) { + if (publisher.changeState(NO_DEMAND, this)) { publisher.checkOnDataAvailable(); } } @@ -288,23 +293,6 @@ public abstract class AbstractListenerReadPublisher implements Publisher { @Override void onDataAvailable(AbstractListenerReadPublisher publisher) { - for (;;) { - if (!read(publisher)) { - return; - } - // Maybe demand arrived between readAndPublish and READING->NO_DEMAND? - long r = publisher.demand; - if (r == 0 || publisher.changeState(NO_DEMAND, this)) { - break; - } - } - } - - /** - * @return whether to exit the read loop; false means stop trying - * to read, true means check demand one more time. - */ - boolean read(AbstractListenerReadPublisher publisher) { if (publisher.changeState(this, READING)) { try { boolean demandAvailable = publisher.readAndPublish(); @@ -313,18 +301,22 @@ public abstract class AbstractListenerReadPublisher implements Publisher { publisher.checkOnDataAvailable(); } } - else if (publisher.changeState(READING, NO_DEMAND)) { - publisher.suspendReading(); - return true; + else { + publisher.readingPaused(); + if (publisher.changeState(READING, NO_DEMAND)) { + // Demand may have arrived since readAndPublish returned + long r = publisher.demand; + if (r > 0 && publisher.changeState(NO_DEMAND, this)) { + publisher.checkOnDataAvailable(); + } + } } } catch (IOException ex) { publisher.onError(ex); } } - // Either competing onDataAvailable calls (via request or container callback) - // Or a concurrent completion - return false; + // Else, either competing onDataAvailable (request vs container), or concurrent completion } }, diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java index 6ae7a25bc5..60a7f997e6 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -267,7 +267,7 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest { } @Override - protected void suspendReading() { + protected void readingPaused() { // no-op } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index b14d467ea6..fc22e01086 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -150,32 +150,16 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { @Override protected void checkOnDataAvailable() { - // TODO: The onDataAvailable() call below can cause a StackOverflowError - // since this method is being called from onDataAvailable() itself. - if (isReadPossible()) { - onDataAvailable(); - } - } - - private boolean isReadPossible() { - if (!this.channel.isReadResumed()) { - this.channel.resumeReads(); - } - return this.channel.isReadResumed(); + this.channel.resumeReads(); + // We are allowed to try, it will return null if data is not available + onDataAvailable(); } @Override - protected void suspendReading() { + protected void readingPaused() { this.channel.suspendReads(); } - @Override - public void onAllDataRead() { - this.channel.getReadSetter().set(null); - this.channel.resumeReads(); - super.onAllDataRead(); - } - @Override @Nullable protected DataBuffer read() throws IOException { diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java index 66ed319bd0..81260eb63f 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java @@ -68,7 +68,7 @@ public class ListenerReadPublisherTests { } @Override - protected void suspendReading() { + protected void readingPaused() { // No-op } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index deca209e03..3cd34f9e96 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -149,17 +149,6 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc */ protected abstract void resumeReceiving(); - /** - * Whether receiving new message(s) is suspended. - *

Note: if the underlying WebSocket API does not provide - * flow control for receiving messages, then this method as well as - * {@link #canSuspendReceiving()} should both return {@code false}. - * @return returns {@code true} if receiving new message(s) is suspended, - * or otherwise {@code false}. - * @since 5.0.2 - */ - protected abstract boolean isSuspended(); - /** * Send the given WebSocket message. */ @@ -231,16 +220,14 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc @Override protected void checkOnDataAvailable() { - if (isSuspended()) { - resumeReceiving(); - } + resumeReceiving(); if (!this.pendingMessages.isEmpty()) { onDataAvailable(); } } @Override - protected void suspendReading() { + protected void readingPaused() { suspendReceiving(); } @@ -250,14 +237,6 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc return (WebSocketMessage) this.pendingMessages.poll(); } - @Override - public void onAllDataRead() { - if (isSuspended()) { - resumeReceiving(); - } - super.onAllDataRead(); - } - void handleMessage(WebSocketMessage webSocketMessage) { this.pendingMessages.offer(webSocketMessage); onDataAvailable(); diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index feb37108fc..449394b781 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -74,14 +74,10 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession