From 01a82b529107f75ded838bebeb0045aac8903be6 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 23 Nov 2017 10:55:03 -0500 Subject: [PATCH] Improve semantics writing currentData Before this commit, the return value from write was interpreted as the data being fully written and ready to be released via releaseData(). This is not true for WebSocketSession implementations where a true return value simply means the message was sent with the full payload but releas is not appropriate until a send confirmation. Technically not an issue since WebSocketSession's extending this do not use pooled buffers. Nevertheless this commit refines the semantics of write, removes the releaseData() method, and makes sub-classes responsible for releasing the buffer when fully written (and they know best when that is). As a bonus currentData is now private. Issue: SPR-16207 --- .../AbstractListenerWriteFlushProcessor.java | 2 +- .../AbstractListenerWriteProcessor.java | 23 ++++++++----------- .../reactive/ServletServerHttpResponse.java | 22 +++++++----------- .../reactive/UndertowServerHttpResponse.java | 23 ++++++++----------- .../AbstractListenerWebSocketSession.java | 10 ++++---- .../adapter/UndertowWebSocketSession.java | 18 +++++++++++---- 6 files changed, 47 insertions(+), 51 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java index 96dceba33b..456109b10b 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java @@ -324,7 +324,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } public void writeComplete(AbstractListenerWriteFlushProcessor processor) { - // ignore + throw new IllegalStateException(toString()); } public void onFlushPossible(AbstractListenerWriteFlushProcessor processor) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index 5645f00f72..7aed653eb2 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -51,7 +51,7 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor implements ProcessorNote: Sub-classes are responsible for releasing any + * data buffer associated with the item, once fully written, if pooled + * buffers apply to the underlying container. * @param data the item to write - * @return whether the data was fully written ({@code true}) - * and new data can be requested, or otherwise ({@code false}) + * @return whether the current data item was written and another one + * requested ({@code true}), or or otherwise if more writes are required. */ protected abstract boolean write(T data) throws IOException; @@ -165,7 +163,7 @@ public abstract class AbstractListenerWriteProcessor implements ProcessorThe default implementation is a no-op. */ - protected void suspendWriting() { + protected void writingPaused() { } /** @@ -275,15 +273,14 @@ public abstract class AbstractListenerWriteProcessor implements Processor extends AbstractWebSoc /** * Send the given WebSocket message. + *

Note: Sub-classes are responsible for releasing the + * payload data buffer, once fully written, if pooled buffers apply to the + * underlying container. */ protected abstract boolean sendMessage(WebSocketMessage message) throws IOException; @@ -268,11 +271,6 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc return sendMessage(message); } - @Override - protected void releaseData() { - this.currentData = null; - } - @Override protected boolean isDataEmpty(WebSocketMessage message) { return (message.getPayload().readableByteCount() == 0); @@ -280,7 +278,7 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc @Override protected boolean isWritePossible() { - return (this.isReady && this.currentData != null); + return (this.isReady); } /** diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index 122c7c50a6..24b36477b0 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -27,7 +27,9 @@ import io.undertow.websockets.core.WebSockets; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.lang.Nullable; import org.springframework.util.ObjectUtils; import org.springframework.web.reactive.socket.CloseStatus; @@ -78,19 +80,19 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession { + private final DataBuffer payload; + + SendProcessorCallback(DataBuffer payload) { + this.payload = payload; + } + @Override public void complete(WebSocketChannel channel, Void context) { + DataBufferUtils.release(this.payload); getSendProcessor().setReadyToSend(true); getSendProcessor().onWritePossible(); } @Override public void onError(WebSocketChannel channel, Void context, Throwable throwable) { + DataBufferUtils.release(this.payload); getSendProcessor().cancel(); getSendProcessor().onError(throwable); }