Browse Source

Improve semantics writing currentData

Before this commit, the return value from write was interpreted as the
data being fully written and ready to be released via releaseData().

This is not true for WebSocketSession implementations where a true
return value simply means the message was sent with the full payload
but releas is not appropriate until a send confirmation.

Technically not an issue since WebSocketSession's extending this do
not use pooled buffers. Nevertheless this commit refines the semantics
of write, removes the releaseData() method, and makes sub-classes
responsible for releasing the buffer when fully written (and they
know best when that is). As a bonus currentData is now private.

Issue: SPR-16207
pull/1605/head
Rossen Stoyanchev 7 years ago
parent
commit
01a82b5291
  1. 2
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java
  2. 23
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java
  3. 22
      spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java
  4. 23
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java
  5. 10
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java
  6. 18
      spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

2
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java

@ -324,7 +324,7 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
} }
public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor) { public <T> void writeComplete(AbstractListenerWriteFlushProcessor<T> processor) {
// ignore throw new IllegalStateException(toString());
} }
public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor) { public <T> void onFlushPossible(AbstractListenerWriteFlushProcessor<T> processor) {

23
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

@ -51,7 +51,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
private Subscription subscription; private Subscription subscription;
@Nullable @Nullable
protected volatile T currentData; private volatile T currentData;
private volatile boolean subscriberCompleted; private volatile boolean subscriberCompleted;
@ -142,11 +142,6 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
this.currentData = data; this.currentData = data;
} }
/**
* Called when the current received data item can be released.
*/
protected abstract void releaseData();
/** /**
* Whether writing is possible. * Whether writing is possible.
*/ */
@ -154,9 +149,12 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
/** /**
* Write the given item. * Write the given item.
* <p><strong>Note:</strong> Sub-classes are responsible for releasing any
* data buffer associated with the item, once fully written, if pooled
* buffers apply to the underlying container.
* @param data the item to write * @param data the item to write
* @return whether the data was fully written ({@code true}) * @return whether the current data item was written and another one
* and new data can be requested, or otherwise ({@code false}) * requested ({@code true}), or or otherwise if more writes are required.
*/ */
protected abstract boolean write(T data) throws IOException; protected abstract boolean write(T data) throws IOException;
@ -165,7 +163,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
* the next item from the upstream, write Publisher. * the next item from the upstream, write Publisher.
* <p>The default implementation is a no-op. * <p>The default implementation is a no-op.
*/ */
protected void suspendWriting() { protected void writingPaused() {
} }
/** /**
@ -275,15 +273,14 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
T data = processor.currentData; T data = processor.currentData;
Assert.state(data != null, "No data"); Assert.state(data != null, "No data");
try { try {
boolean writeCompleted = processor.write(data); if (processor.write(data)) {
if (writeCompleted) {
processor.releaseData();
if (processor.changeState(WRITING, REQUESTED)) { if (processor.changeState(WRITING, REQUESTED)) {
processor.currentData = null;
if (processor.subscriberCompleted) { if (processor.subscriberCompleted) {
processor.changeStateToComplete(REQUESTED); processor.changeStateToComplete(REQUESTED);
} }
else { else {
processor.suspendWriting(); processor.writingPaused();
Assert.state(processor.subscription != null, "No subscription"); Assert.state(processor.subscription != null, "No subscription");
processor.subscription.request(1); processor.subscription.request(1);
} }

22
spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java

@ -35,7 +35,6 @@ import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.http.ResponseCookie; import org.springframework.http.ResponseCookie;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
@ -303,15 +302,6 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
return ServletServerHttpResponse.this.isWritePossible(); return ServletServerHttpResponse.this.isWritePossible();
} }
@Override
protected void releaseData() {
if (logger.isTraceEnabled()) {
logger.trace("releaseData: " + this.currentData);
}
DataBufferUtils.release(this.currentData);
this.currentData = null;
}
@Override @Override
protected boolean isDataEmpty(DataBuffer dataBuffer) { protected boolean isDataEmpty(DataBuffer dataBuffer) {
return dataBuffer.readableByteCount() == 0; return dataBuffer.readableByteCount() == 0;
@ -335,11 +325,15 @@ class ServletServerHttpResponse extends AbstractListenerServerHttpResponse {
if (this.logger.isTraceEnabled()) { if (this.logger.isTraceEnabled()) {
this.logger.trace("written: " + written + " total: " + remaining); this.logger.trace("written: " + written + " total: " + remaining);
} }
return written == remaining; if (written == remaining) {
} if (logger.isTraceEnabled()) {
else { logger.trace("releaseData: " + dataBuffer);
return false; }
DataBufferUtils.release(dataBuffer);
return true;
}
} }
return false;
} }
@Override @Override

23
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

@ -193,7 +193,15 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("written: " + written + " total: " + total); logger.trace("written: " + written + " total: " + total);
} }
return written == total; if (written != total) {
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("releaseData: " + dataBuffer);
}
DataBufferUtils.release(dataBuffer);
this.byteBuffer = null;
return true;
} }
private int writeByteBuffer(ByteBuffer byteBuffer) throws IOException { private int writeByteBuffer(ByteBuffer byteBuffer) throws IOException {
@ -213,24 +221,13 @@ class UndertowServerHttpResponse extends AbstractListenerServerHttpResponse impl
this.byteBuffer = dataBuffer.asByteBuffer(); this.byteBuffer = dataBuffer.asByteBuffer();
} }
@Override
protected void releaseData() {
if (logger.isTraceEnabled()) {
logger.trace("releaseData: " + this.currentData);
}
DataBufferUtils.release(this.currentData);
this.currentData = null;
this.byteBuffer = null;
}
@Override @Override
protected boolean isDataEmpty(DataBuffer dataBuffer) { protected boolean isDataEmpty(DataBuffer dataBuffer) {
return (dataBuffer.readableByteCount() == 0); return (dataBuffer.readableByteCount() == 0);
} }
@Override @Override
protected void suspendWriting() { protected void writingPaused() {
this.channel.suspendWrites(); this.channel.suspendWrites();
} }

10
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/AbstractListenerWebSocketSession.java

@ -153,6 +153,9 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
/** /**
* Send the given WebSocket message. * Send the given WebSocket message.
* <p><strong>Note:</strong> Sub-classes are responsible for releasing the
* payload data buffer, once fully written, if pooled buffers apply to the
* underlying container.
*/ */
protected abstract boolean sendMessage(WebSocketMessage message) throws IOException; protected abstract boolean sendMessage(WebSocketMessage message) throws IOException;
@ -268,11 +271,6 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
return sendMessage(message); return sendMessage(message);
} }
@Override
protected void releaseData() {
this.currentData = null;
}
@Override @Override
protected boolean isDataEmpty(WebSocketMessage message) { protected boolean isDataEmpty(WebSocketMessage message) {
return (message.getPayload().readableByteCount() == 0); return (message.getPayload().readableByteCount() == 0);
@ -280,7 +278,7 @@ public abstract class AbstractListenerWebSocketSession<T> extends AbstractWebSoc
@Override @Override
protected boolean isWritePossible() { protected boolean isWritePossible() {
return (this.isReady && this.currentData != null); return (this.isReady);
} }
/** /**

18
spring-webflux/src/main/java/org/springframework/web/reactive/socket/adapter/UndertowWebSocketSession.java

@ -27,7 +27,9 @@ import io.undertow.websockets.core.WebSockets;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.MonoProcessor;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.lang.Nullable; import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.web.reactive.socket.CloseStatus; import org.springframework.web.reactive.socket.CloseStatus;
@ -78,19 +80,19 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W
if (WebSocketMessage.Type.TEXT.equals(message.getType())) { if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
getSendProcessor().setReadyToSend(false); getSendProcessor().setReadyToSend(false);
String text = new String(buffer.array(), StandardCharsets.UTF_8); String text = new String(buffer.array(), StandardCharsets.UTF_8);
WebSockets.sendText(text, getDelegate(), new SendProcessorCallback()); WebSockets.sendText(text, getDelegate(), new SendProcessorCallback(message.getPayload()));
} }
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) { else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
getSendProcessor().setReadyToSend(false); getSendProcessor().setReadyToSend(false);
WebSockets.sendBinary(buffer, getDelegate(), new SendProcessorCallback()); WebSockets.sendBinary(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
} }
else if (WebSocketMessage.Type.PING.equals(message.getType())) { else if (WebSocketMessage.Type.PING.equals(message.getType())) {
getSendProcessor().setReadyToSend(false); getSendProcessor().setReadyToSend(false);
WebSockets.sendPing(buffer, getDelegate(), new SendProcessorCallback()); WebSockets.sendPing(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
} }
else if (WebSocketMessage.Type.PONG.equals(message.getType())) { else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
getSendProcessor().setReadyToSend(false); getSendProcessor().setReadyToSend(false);
WebSockets.sendPong(buffer, getDelegate(), new SendProcessorCallback()); WebSockets.sendPong(buffer, getDelegate(), new SendProcessorCallback(message.getPayload()));
} }
else { else {
throw new IllegalArgumentException("Unexpected message type: " + message.getType()); throw new IllegalArgumentException("Unexpected message type: " + message.getType());
@ -110,14 +112,22 @@ public class UndertowWebSocketSession extends AbstractListenerWebSocketSession<W
private final class SendProcessorCallback implements WebSocketCallback<Void> { private final class SendProcessorCallback implements WebSocketCallback<Void> {
private final DataBuffer payload;
SendProcessorCallback(DataBuffer payload) {
this.payload = payload;
}
@Override @Override
public void complete(WebSocketChannel channel, Void context) { public void complete(WebSocketChannel channel, Void context) {
DataBufferUtils.release(this.payload);
getSendProcessor().setReadyToSend(true); getSendProcessor().setReadyToSend(true);
getSendProcessor().onWritePossible(); getSendProcessor().onWritePossible();
} }
@Override @Override
public void onError(WebSocketChannel channel, Void context, Throwable throwable) { public void onError(WebSocketChannel channel, Void context, Throwable throwable) {
DataBufferUtils.release(this.payload);
getSendProcessor().cancel(); getSendProcessor().cancel();
getSendProcessor().onError(throwable); getSendProcessor().onError(throwable);
} }

Loading…
Cancel
Save