diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 29e7a202fc..5f2015c507 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -116,10 +116,9 @@ public abstract class AbstractListenerReadPublisher implements Publisher { protected abstract T read() throws IOException; /** - * Suspend reading. Defaults to no-op. + * Suspend reading, if the underlying API provides such a mechanism. */ - protected void suspendReading() { - } + protected abstract void suspendReading(); // Private methods for use in State... diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java index 89418a0ecc..6ae7a25bc5 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java @@ -266,6 +266,11 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest { return null; } + @Override + protected void suspendReading() { + // no-op + } + private class RequestBodyPublisherReadListener implements ReadListener { diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java b/spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java index abd7be3276..66ed319bd0 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java @@ -67,6 +67,11 @@ public class ListenerReadPublisherTests { return mock(DataBuffer.class); } + @Override + protected void suspendReading() { + // No-op + } + public int getReadCalls() { return this.readCalls; } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index c77aebca64..deca209e03 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -150,13 +150,13 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc protected abstract void resumeReceiving(); /** - * Returns {@code true} if receiving new message(s) is suspended otherwise - * {@code false}. + * Whether receiving new message(s) is suspended. *

Note: if the underlying WebSocket API does not provide - * flow control for receiving messages, and this method should return - * {@code false} and {@link #canSuspendReceiving()} should return {@code false}. - * @return returns {@code true} if receiving new message(s) is suspended - * otherwise {@code false}. + * flow control for receiving messages, then this method as well as + * {@link #canSuspendReceiving()} should both return {@code false}. + * @return returns {@code true} if receiving new message(s) is suspended, + * or otherwise {@code false}. + * @since 5.0.2 */ protected abstract boolean isSuspended(); @@ -226,14 +226,15 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher { - private volatile Queue pendingWebSocketMessages = Queues.unbounded().get(); + private volatile Queue pendingMessages = Queues.unbounded(Queues.SMALL_BUFFER_SIZE).get(); + @Override protected void checkOnDataAvailable() { if (isSuspended()) { resumeReceiving(); } - if (!pendingWebSocketMessages.isEmpty()) { + if (!this.pendingMessages.isEmpty()) { onDataAvailable(); } } @@ -246,7 +247,7 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc @Override @Nullable protected WebSocketMessage read() throws IOException { - return (WebSocketMessage) pendingWebSocketMessages.poll(); + return (WebSocketMessage) this.pendingMessages.poll(); } @Override @@ -258,7 +259,7 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc } void handleMessage(WebSocketMessage webSocketMessage) { - this.pendingWebSocketMessages.offer(webSocketMessage); + this.pendingMessages.offer(webSocketMessage); onDataAvailable(); } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java index 28ce1e007f..792c036e11 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java @@ -17,16 +17,15 @@ package org.springframework.web.reactive.socket.adapter; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - import javax.websocket.Session; import org.apache.tomcat.websocket.WsSession; +import reactor.core.publisher.MonoProcessor; + import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.web.reactive.socket.HandshakeInfo; import org.springframework.web.reactive.socket.WebSocketSession; -import reactor.core.publisher.MonoProcessor; - /** * Spring {@link WebSocketSession} adapter for Tomcat's * {@link javax.websocket.Session}. @@ -35,8 +34,10 @@ import reactor.core.publisher.MonoProcessor; * @since 5.0 */ public class TomcatWebSocketSession extends StandardWebSocketSession { + private static final AtomicIntegerFieldUpdater SUSPENDED = AtomicIntegerFieldUpdater.newUpdater(TomcatWebSocketSession.class, "suspended"); + private volatile int suspended;