diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java index 96dceba33b..456109b10b 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java @@ -324,7 +324,7 @@ public abstract class AbstractListenerWriteFlushProcessor implements Processo } public void writeComplete(AbstractListenerWriteFlushProcessor processor) { - // ignore + throw new IllegalStateException(toString()); } public void onFlushPossible(AbstractListenerWriteFlushProcessor processor) { diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index 5645f00f72..7aed653eb2 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -51,7 +51,7 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor implements ProcessorNote: Sub-classes are responsible for releasing any + * data buffer associated with the item, once fully written, if pooled + * buffers apply to the underlying container. * @param data the item to write - * @return whether the data was fully written ({@code true}) - * and new data can be requested, or otherwise ({@code false}) + * @return whether the current data item was written and another one + * requested ({@code true}), or or otherwise if more writes are required. */ protected abstract boolean write(T data) throws IOException; @@ -165,7 +163,7 @@ public abstract class AbstractListenerWriteProcessor implements ProcessorThe default implementation is a no-op. */ - protected void suspendWriting() { + protected void writingPaused() { } /** @@ -275,15 +273,14 @@ public abstract class AbstractListenerWriteProcessor implements Processor extends AbstractWebSoc /** * Send the given WebSocket message. + *

Note: Sub-classes are responsible for releasing the + * payload data buffer, once fully written, if pooled buffers apply to the + * underlying container. */ protected abstract boolean sendMessage(WebSocketMessage message) throws IOException; @@ -268,11 +271,6 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc return sendMessage(message); } - @Override - protected void releaseData() { - this.currentData = null; - } - @Override protected boolean isDataEmpty(WebSocketMessage message) { return (message.getPayload().readableByteCount() == 0); @@ -280,7 +278,7 @@ public abstract class AbstractListenerWebSocketSession extends AbstractWebSoc @Override protected boolean isWritePossible() { - return (this.isReady && this.currentData != null); + return (this.isReady); } /** diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java index 122c7c50a6..24b36477b0 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java @@ -27,7 +27,9 @@ import io.undertow.websockets.core.WebSockets; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.lang.Nullable; import org.springframework.util.ObjectUtils; import org.springframework.web.reactive.socket.CloseStatus; @@ -78,19 +80,19 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession { + private final DataBuffer payload; + + SendProcessorCallback(DataBuffer payload) { + this.payload = payload; + } + @Override public void complete(WebSocketChannel channel, Void context) { + DataBufferUtils.release(this.payload); getSendProcessor().setReadyToSend(true); getSendProcessor().onWritePossible(); } @Override public void onError(WebSocketChannel channel, Void context, Throwable throwable) { + DataBufferUtils.release(this.payload); getSendProcessor().cancel(); getSendProcessor().onError(throwable); }