diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java index a55a033184..c77aebca64 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java @@ -17,6 +17,7 @@ package org.springframework.web.reactive.socket.adapter; import java.io.IOException; +import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import org.reactivestreams.Publisher; @@ -25,6 +26,7 @@ import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.util.concurrent.Queues; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.http.server.reactive.AbstractListenerReadPublisher; @@ -147,6 +149,17 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc */ protected abstract void resumeReceiving(); + /** + * Returns {@code true} if receiving new message(s) is suspended otherwise + * {@code false}. + *

Note: if the underlying WebSocket API does not provide + * flow control for receiving messages, and this method should return + * {@code false} and {@link #canSuspendReceiving()} should return {@code false}. + * @return returns {@code true} if receiving new message(s) is suspended + * otherwise {@code false}. + */ + protected abstract boolean isSuspended(); + /** * Send the given WebSocket message. */ @@ -213,32 +226,39 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher { - @Nullable - private volatile WebSocketMessage webSocketMessage; + private volatile Queue pendingWebSocketMessages = Queues.unbounded().get(); @Override protected void checkOnDataAvailable() { - if (this.webSocketMessage != null) { + if (isSuspended()) { + resumeReceiving(); + } + if (!pendingWebSocketMessages.isEmpty()) { onDataAvailable(); } } + @Override + protected void suspendReading() { + suspendReceiving(); + } + @Override @Nullable protected WebSocketMessage read() throws IOException { - if (this.webSocketMessage != null) { - WebSocketMessage result = this.webSocketMessage; - this.webSocketMessage = null; + return (WebSocketMessage) pendingWebSocketMessages.poll(); + } + + @Override + public void onAllDataRead() { + if (isSuspended()) { resumeReceiving(); - return result; } - - return null; + super.onAllDataRead(); } void handleMessage(WebSocketMessage webSocketMessage) { - this.webSocketMessage = webSocketMessage; - suspendReceiving(); + this.pendingWebSocketMessages.offer(webSocketMessage); onDataAvailable(); } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java index 02ecae135a..feb37108fc 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java @@ -79,6 +79,11 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession SUSPENDED = + AtomicIntegerFieldUpdater.newUpdater(TomcatWebSocketSession.class, "suspended"); + private volatile int suspended; public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) { @@ -53,12 +58,21 @@ public class TomcatWebSocketSession extends StandardWebSocketSession { @Override protected void suspendReceiving() { - ((WsSession) getDelegate()).suspend(); + if (SUSPENDED.compareAndSet(this, 0, 1)) { + ((WsSession) getDelegate()).suspend(); + } } @Override protected void resumeReceiving() { - ((WsSession) getDelegate()).resume(); + if (SUSPENDED.compareAndSet(this, 1, 0)) { + ((WsSession) getDelegate()).resume(); + } + } + + @Override + protected boolean isSuspended() { + return this.suspended == 1; } } diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index ef752730ae..4965d6c728 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -61,14 +61,21 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession