diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java index 45b24f1566..d682672a31 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java @@ -150,6 +150,18 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor void onComplete(AbstractListenerWriteProcessor processor) { if (processor.changeState(this, COMPLETED)) { + processor.writingComplete(); processor.resultPublisher.publishComplete(); } } @@ -248,10 +261,12 @@ public abstract class AbstractListenerWriteProcessor implements Processor implements Processor void onError(AbstractListenerWriteProcessor processor, Throwable ex) { if (processor.changeState(this, COMPLETED)) { + processor.writingComplete(); processor.resultPublisher.publishError(ex); } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java index 63854ef3dc..21503168eb 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java @@ -306,6 +306,11 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons return false; } } + + @Override + protected void writingComplete() { + bodyProcessor = null; + } } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java index 3038602d34..bc8728e159 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java @@ -138,9 +138,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon if (this.responseChannel == null) { this.responseChannel = this.exchange.getResponseChannel(); } - ResponseBodyProcessor bodyProcessor = new ResponseBodyProcessor( this.responseChannel); - bodyProcessor.registerListener(); - return bodyProcessor; + return new ResponseBodyProcessor(this.responseChannel); } @@ -153,16 +151,18 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon public ResponseBodyProcessor(StreamSinkChannel channel) { Assert.notNull(channel, "StreamSinkChannel must not be null"); this.channel = channel; - } - - public void registerListener() { this.channel.getWriteSetter().set(c -> onWritePossible()); - this.channel.resumeWrites(); + this.channel.suspendWrites(); } @Override protected boolean isWritePossible() { - return false; + if (this.channel.isWriteResumed()) { + return true; + } else { + this.channel.resumeWrites(); + return false; + } } @Override @@ -214,6 +214,17 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon protected boolean isDataEmpty(DataBuffer dataBuffer) { return (dataBuffer.readableByteCount() == 0); } + + @Override + protected void suspendWriting() { + this.channel.suspendWrites(); + } + + @Override + protected void writingComplete() { + this.channel.getWriteSetter().set(null); + this.channel.resumeWrites(); + } }