Browse Source

Resume Undertow writes only when data is available

pull/1370/merge
Violeta Georgieva 8 years ago committed by Arjen Poutsma
parent
commit
4d058ceef4
  1. 16
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java
  2. 5
      spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java
  3. 27
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

16
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

@ -150,6 +150,18 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -150,6 +150,18 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
*/
protected abstract boolean write(T data) throws IOException;
/**
* Suspend writing. Defaults to no-op.
*/
protected void suspendWriting() {
}
/**
* Invoked when writing is complete. Defaults to no-op.
*/
protected void writingComplete() {
}
private boolean changeState(State oldState, State newState) {
return this.state.compareAndSet(oldState, newState);
@ -223,6 +235,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -223,6 +235,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
@Override
public <T> void onComplete(AbstractListenerWriteProcessor<T> processor) {
if (processor.changeState(this, COMPLETED)) {
processor.writingComplete();
processor.resultPublisher.publishComplete();
}
}
@ -248,10 +261,12 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -248,10 +261,12 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
processor.releaseData();
if (!processor.subscriberCompleted) {
processor.changeState(WRITING, REQUESTED);
processor.suspendWriting();
processor.subscription.request(1);
}
else {
processor.changeState(WRITING, COMPLETED);
processor.writingComplete();
processor.resultPublisher.publishComplete();
}
}
@ -311,6 +326,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T, @@ -311,6 +326,7 @@ public abstract class AbstractListenerWriteProcessor<T> implements Processor<T,
public <T> void onError(AbstractListenerWriteProcessor<T> processor, Throwable ex) {
if (processor.changeState(this, COMPLETED)) {
processor.writingComplete();
processor.resultPublisher.publishError(ex);
}
}

5
spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java

@ -306,6 +306,11 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons @@ -306,6 +306,11 @@ public class ServletServerHttpResponse extends AbstractListenerServerHttpRespons
return false;
}
}
@Override
protected void writingComplete() {
bodyProcessor = null;
}
}
}

27
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpResponse.java

@ -138,9 +138,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon @@ -138,9 +138,7 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
if (this.responseChannel == null) {
this.responseChannel = this.exchange.getResponseChannel();
}
ResponseBodyProcessor bodyProcessor = new ResponseBodyProcessor( this.responseChannel);
bodyProcessor.registerListener();
return bodyProcessor;
return new ResponseBodyProcessor(this.responseChannel);
}
@ -153,16 +151,18 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon @@ -153,16 +151,18 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
public ResponseBodyProcessor(StreamSinkChannel channel) {
Assert.notNull(channel, "StreamSinkChannel must not be null");
this.channel = channel;
}
public void registerListener() {
this.channel.getWriteSetter().set(c -> onWritePossible());
this.channel.resumeWrites();
this.channel.suspendWrites();
}
@Override
protected boolean isWritePossible() {
return false;
if (this.channel.isWriteResumed()) {
return true;
} else {
this.channel.resumeWrites();
return false;
}
}
@Override
@ -214,6 +214,17 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon @@ -214,6 +214,17 @@ public class UndertowServerHttpResponse extends AbstractListenerServerHttpRespon
protected boolean isDataEmpty(DataBuffer dataBuffer) {
return (dataBuffer.readableByteCount() == 0);
}
@Override
protected void suspendWriting() {
this.channel.suspendWrites();
}
@Override
protected void writingComplete() {
this.channel.getWriteSetter().set(null);
this.channel.resumeWrites();
}
}

Loading…
Cancel
Save