Browse Source

Polish

pull/1603/head
Rossen Stoyanchev 7 years ago
parent
commit
b7c924cac1
  1. 5
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java
  2. 5
      spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java
  3. 5
      spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java
  4. 21
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java
  5. 7
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java

5
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

@ -116,10 +116,9 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
protected abstract T read() throws IOException; protected abstract T read() throws IOException;
/** /**
* Suspend reading. Defaults to no-op. * Suspend reading, if the underlying API provides such a mechanism.
*/ */
protected void suspendReading() { protected abstract void suspendReading();
}
// Private methods for use in State... // Private methods for use in State...

5
spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpRequest.java

@ -266,6 +266,11 @@ class ServletServerHttpRequest extends AbstractServerHttpRequest {
return null; return null;
} }
@Override
protected void suspendReading() {
// no-op
}
private class RequestBodyPublisherReadListener implements ReadListener { private class RequestBodyPublisherReadListener implements ReadListener {

5
spring-web/src/test/java/org/springframework/http/server/reactive/ListenerReadPublisherTests.java

@ -67,6 +67,11 @@ public class ListenerReadPublisherTests {
return mock(DataBuffer.class); return mock(DataBuffer.class);
} }
@Override
protected void suspendReading() {
// No-op
}
public int getReadCalls() { public int getReadCalls() {
return this.readCalls; return this.readCalls;
} }

21
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

@ -150,13 +150,13 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
protected abstract void resumeReceiving(); protected abstract void resumeReceiving();
/** /**
* Returns {@code true} if receiving new message(s) is suspended otherwise * Whether receiving new message(s) is suspended.
* {@code false}.
* <p><strong>Note:</strong> if the underlying WebSocket API does not provide * <p><strong>Note:</strong> if the underlying WebSocket API does not provide
* flow control for receiving messages, and this method should return * flow control for receiving messages, then this method as well as
* {@code false} and {@link #canSuspendReceiving()} should return {@code false}. * {@link #canSuspendReceiving()} should both return {@code false}.
* @return returns {@code true} if receiving new message(s) is suspended * @return returns {@code true} if receiving new message(s) is suspended,
* otherwise {@code false}. * or otherwise {@code false}.
* @since 5.0.2
*/ */
protected abstract boolean isSuspended(); protected abstract boolean isSuspended();
@ -226,14 +226,15 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher<WebSocketMessage> { private final class WebSocketReceivePublisher extends AbstractListenerReadPublisher<WebSocketMessage> {
private volatile Queue<Object> pendingWebSocketMessages = Queues.unbounded().get(); private volatile Queue<Object> pendingMessages = Queues.unbounded(Queues.SMALL_BUFFER_SIZE).get();
@Override @Override
protected void checkOnDataAvailable() { protected void checkOnDataAvailable() {
if (isSuspended()) { if (isSuspended()) {
resumeReceiving(); resumeReceiving();
} }
if (!pendingWebSocketMessages.isEmpty()) { if (!this.pendingMessages.isEmpty()) {
onDataAvailable(); onDataAvailable();
} }
} }
@ -246,7 +247,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
@Override @Override
@Nullable @Nullable
protected WebSocketMessage read() throws IOException { protected WebSocketMessage read() throws IOException {
return (WebSocketMessage) pendingWebSocketMessages.poll(); return (WebSocketMessage) this.pendingMessages.poll();
} }
@Override @Override
@ -258,7 +259,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
} }
void handleMessage(WebSocketMessage webSocketMessage) { void handleMessage(WebSocketMessage webSocketMessage) {
this.pendingWebSocketMessages.offer(webSocketMessage); this.pendingMessages.offer(webSocketMessage);
onDataAvailable(); onDataAvailable();
} }
} }

7
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/TomcatWebSocketSession.java

@ -17,16 +17,15 @@
package org.springframework.web.reactive.socket.adapter; package org.springframework.web.reactive.socket.adapter;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.websocket.Session; import javax.websocket.Session;
import org.apache.tomcat.websocket.WsSession; import org.apache.tomcat.websocket.WsSession;
import reactor.core.publisher.MonoProcessor;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.web.reactive.socket.HandshakeInfo; import org.springframework.web.reactive.socket.HandshakeInfo;
import org.springframework.web.reactive.socket.WebSocketSession; import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.MonoProcessor;
/** /**
* Spring {@link WebSocketSession} adapter for Tomcat's * Spring {@link WebSocketSession} adapter for Tomcat's
* {@link javax.websocket.Session}. * {@link javax.websocket.Session}.
@ -35,8 +34,10 @@ import reactor.core.publisher.MonoProcessor;
* @since 5.0 * @since 5.0
*/ */
public class TomcatWebSocketSession extends StandardWebSocketSession { public class TomcatWebSocketSession extends StandardWebSocketSession {
private static final AtomicIntegerFieldUpdater<TomcatWebSocketSession> SUSPENDED = private static final AtomicIntegerFieldUpdater<TomcatWebSocketSession> SUSPENDED =
AtomicIntegerFieldUpdater.newUpdater(TomcatWebSocketSession.class, "suspended"); AtomicIntegerFieldUpdater.newUpdater(TomcatWebSocketSession.class, "suspended");
private volatile int suspended; private volatile int suspended;

Loading…
Cancel
Save