Browse Source

AbstractListenerWebSocketSession: suspend the channel when there is no demand

Issues: SPR-16207
pull/1603/head
Violeta Georgieva 7 years ago committed by Rossen Stoyanchev
parent
commit
d8099adc9a
  1. 42
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java
  2. 5
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java
  3. 5
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java
  4. 18
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java
  5. 7
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

42
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

@ -17,6 +17,7 @@
package org.springframework.web.reactive.socket.adapter; package org.springframework.web.reactive.socket.adapter;
import java.io.IOException; import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@ -25,6 +26,7 @@ import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.MonoProcessor;
import reactor.util.concurrent.Queues;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.server.reactive.AbstractListenerReadPublisher; import org.springframework.http.server.reactive.AbstractListenerReadPublisher;
@ -147,6 +149,17 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
*/ */
protected abstract void resumeReceiving(); protected abstract void resumeReceiving();
/**
* Returns {@code true} if receiving new message(s) is suspended otherwise
* {@code false}.
* <p><strong>Note:</strong> if the underlying WebSocket API does not provide
* flow control for receiving messages, and this method should return
* {@code false} and {@link #canSuspendReceiving()} should return {@code false}.
* @return returns {@code true} if receiving new message(s) is suspended
* otherwise {@code false}.
*/
protected abstract boolean isSuspended();
/** /**
* Send the given WebSocket message. * Send the given WebSocket message.
*/ */
@ -213,32 +226,39 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher<WebSocketMessage> { private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher<WebSocketMessage> {
@Nullable private volatile Queue<Object> pendingWebSocketMessages = Queues.unbounded().get();
private volatile WebSocketMessage webSocketMessage;
@Override @Override
protected void checkOnDataAvailable() { protected void checkOnDataAvailable() {
if (this.webSocketMessage != null) { if (isSuspended()) {
resumeReceiving();
}
if (!pendingWebSocketMessages.isEmpty()) {
onDataAvailable(); onDataAvailable();
} }
} }
@Override
protected void suspendReading() {
suspendReceiving();
}
@Override @Override
@Nullable @Nullable
protected WebSocketMessage read() throws IOException { protected WebSocketMessage read() throws IOException {
if (this.webSocketMessage != null) { return (WebSocketMessage) pendingWebSocketMessages.poll();
WebSocketMessage result = this.webSocketMessage; }
this.webSocketMessage = null;
@Override
public void onAllDataRead() {
if (isSuspended()) {
resumeReceiving(); resumeReceiving();
return result;
} }
super.onAllDataRead();
return null;
} }
void handleMessage(WebSocketMessage webSocketMessage) { void handleMessage(WebSocketMessage webSocketMessage) {
this.webSocketMessage = webSocketMessage; this.pendingWebSocketMessages.offer(webSocketMessage);
suspendReceiving();
onDataAvailable(); onDataAvailable();
} }
} }

5
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/JettyWebSocketSession.java

@ -79,6 +79,11 @@ public class JettyWebSocketSession extends AbstractListenerWebSocketSession<Sess
this.suspendToken = null; this.suspendToken = null;
} }
@Override
protected boolean isSuspended() {
return this.suspendToken != null;
}
@Override @Override
protected boolean sendMessage(WebSocketMessage message) throws IOException { protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer(); ByteBuffer buffer = message.getPayload().asByteBuffer();

5
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/StandardWebSocketSession.java

@ -71,6 +71,11 @@ public class StandardWebSocketSession extends AbstractListenerWebSocketSession<S
// no-op // no-op
} }
@Override
protected boolean isSuspended() {
return false;
}
@Override @Override
protected boolean sendMessage(WebSocketMessage message) throws IOException { protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer(); ByteBuffer buffer = message.getPayload().asByteBuffer();

18
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java

@ -16,6 +16,8 @@
package org.springframework.web.reactive.socket.adapter; package org.springframework.web.reactive.socket.adapter;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.websocket.Session; import javax.websocket.Session;
import org.apache.tomcat.websocket.WsSession; import org.apache.tomcat.websocket.WsSession;
@ -33,6 +35,9 @@ import reactor.core.publisher.MonoProcessor;
* @since 5.0 * @since 5.0
*/ */
public class TomcatWebSocketSession extends StandardWebSocketSession { public class TomcatWebSocketSession extends StandardWebSocketSession {
private static final AtomicIntegerFieldUpdater<TomcatWebSocketSession> SUSPENDED =
AtomicIntegerFieldUpdater.newUpdater(TomcatWebSocketSession.class, "suspended");
private volatile int suspended;
public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) { public TomcatWebSocketSession(Session session, HandshakeInfo info, DataBufferFactory factory) {
@ -53,12 +58,21 @@ public class TomcatWebSocketSession extends StandardWebSocketSession {
@Override @Override
protected void suspendReceiving() { protected void suspendReceiving() {
((WsSession) getDelegate()).suspend(); if (SUSPENDED.compareAndSet(this, 0, 1)) {
((WsSession) getDelegate()).suspend();
}
} }
@Override @Override
protected void resumeReceiving() { protected void resumeReceiving() {
((WsSession) getDelegate()).resume(); if (SUSPENDED.compareAndSet(this, 1, 0)) {
((WsSession) getDelegate()).resume();
}
}
@Override
protected boolean isSuspended() {
return this.suspended == 1;
} }
} }

7
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

@ -61,14 +61,21 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W
return true; return true;
} }
@Override
protected void suspendReceiving() { protected void suspendReceiving() {
getDelegate().suspendReceives(); getDelegate().suspendReceives();
} }
@Override
protected void resumeReceiving() { protected void resumeReceiving() {
getDelegate().resumeReceives(); getDelegate().resumeReceives();
} }
@Override
protected boolean isSuspended() {
return !getDelegate().isReceivesResumed();
}
@Override @Override
protected boolean sendMessage(WebSocketMessage message) throws IOException { protected boolean sendMessage(WebSocketMessage message) throws IOException {
ByteBuffer buffer = message.getPayload().asByteBuffer(); ByteBuffer buffer = message.getPayload().asByteBuffer();

Loading…
Cancel
Save