Browse Source

Refactor AbstractRequestBodyPublisher states

The state machine is refactored in order to solve various concurrency
issues.
pull/1111/head
Violeta Georgieva 8 years ago
parent
commit
d68232c880
  1. 106
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java
  2. 21
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java
  3. 5
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

106
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/AbstractRequestBodyPublisher.java

@ -52,8 +52,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> { @@ -52,8 +52,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
private Subscriber<? super DataBuffer> subscriber;
private volatile boolean dataAvailable;
@Override
public void subscribe(Subscriber<? super DataBuffer> subscriber) {
if (this.logger.isTraceEnabled()) {
@ -101,28 +99,24 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> { @@ -101,28 +99,24 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
/**
* Reads and publishes data buffers from the input. Continues till either there is no
* more demand, or till there is no more data to be read.
* @return {@code true} if there is more data to be read; {@code false} otherwise
* @return {@code true} if there is more demand; {@code false} otherwise
*/
private boolean readAndPublish() {
try {
while (hasDemand()) {
DataBuffer dataBuffer = read();
if (dataBuffer != null) {
BackpressureUtils.getAndSub(this.demand, 1L);
this.subscriber.onNext(dataBuffer);
}
else {
return false;
}
private boolean readAndPublish() throws IOException {
while (hasDemand()) {
DataBuffer dataBuffer = read();
if (dataBuffer != null) {
BackpressureUtils.getAndSub(this.demand, 1L);
this.subscriber.onNext(dataBuffer);
}
else {
return true;
}
return true;
}
catch (IOException ex) {
onError(ex);
return false;
}
return false;
}
protected abstract void checkOnDataAvailable();
/**
* Reads a data buffer from the input, if possible. Returns {@code null} if a buffer
* could not be read.
@ -182,10 +176,13 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> { @@ -182,10 +176,13 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
* UNSUBSCRIBED
* |
* v
* DATA_UNAVAILABLE <---> DATA_AVAILABLE
* | |
* v v
* COMPLETED
* NO_DEMAND -------------------> DEMAND
* | ^ ^ |
* | | | |
* | --------- READING <----- |
* | | |
* | v |
* ------------> COMPLETED <---------
* </pre>
* Refer to the individual states for more information.
*/
@ -194,16 +191,14 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> { @@ -194,16 +191,14 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
/**
* The initial unsubscribed state. Will respond to {@link
* #subscribe(AbstractRequestBodyPublisher, Subscriber)} by
* changing state to {@link #DATA_UNAVAILABLE}.
* changing state to {@link #NO_DEMAND}.
*/
UNSUBSCRIBED {
@Override
void subscribe(AbstractRequestBodyPublisher publisher,
Subscriber<? super DataBuffer> subscriber) {
Objects.requireNonNull(subscriber);
State newState =
publisher.dataAvailable ? DATA_AVAILABLE : DATA_UNAVAILABLE;
if (publisher.changeState(this, newState)) {
if (publisher.changeState(this, NO_DEMAND)) {
Subscription subscription = new RequestBodySubscription(
publisher);
publisher.subscriber = subscriber;
@ -213,54 +208,55 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> { @@ -213,54 +208,55 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
throw new IllegalStateException(toString());
}
}
@Override
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
publisher.dataAvailable = true;
}
},
/**
* State that gets entered when there is no data to be read. Responds to {@link
* #request(AbstractRequestBodyPublisher, long)} by increasing the demand, and
* responds to {@link #onDataAvailable(AbstractRequestBodyPublisher)} by
* reading the available data and changing state to {@link #DATA_AVAILABLE} if
* there continues to be more data available after the demand has been satisfied.
* State that gets entered when there is no demand. Responds to {@link
* #request(AbstractRequestBodyPublisher, long)} by increasing the demand,
* changing state to {@link #DEMAND} and will check whether there
* is data available for reading.
*/
DATA_UNAVAILABLE {
NO_DEMAND {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
if (BackpressureUtils.checkRequest(n, publisher.subscriber)) {
BackpressureUtils.addAndGet(publisher.demand, n);
if (publisher.changeState(this, DEMAND)) {
publisher.checkOnDataAvailable();
}
}
}
},
/**
* State that gets entered when there is demand. Responds to
* {@link #onDataAvailable(AbstractRequestBodyPublisher)} by
* reading the available data. The state will be changed to
* {@link #NO_DEMAND} if there is no demand.
*/
DEMAND {
@Override
void onDataAvailable(AbstractRequestBodyPublisher publisher) {
boolean dataAvailable = publisher.readAndPublish();
if (dataAvailable) {
publisher.changeState(this, DATA_AVAILABLE);
if (publisher.changeState(this, READING)) {
try {
boolean demandAvailable = publisher.readAndPublish();
if (demandAvailable) {
publisher.changeState(READING, DEMAND);
publisher.checkOnDataAvailable();
} else {
publisher.changeState(READING, NO_DEMAND);
}
} catch (IOException ex) {
publisher.onError(ex);
}
}
}
},
/**
* State that gets entered when there is data to be read. Responds to {@link
* #request(AbstractRequestBodyPublisher, long)} by increasing the demand, and
* by reading the available data and changing state to {@link #DATA_UNAVAILABLE}
* if there is no more data available.
*/
DATA_AVAILABLE {
READING {
@Override
void request(AbstractRequestBodyPublisher publisher, long n) {
if (BackpressureUtils.checkRequest(n, publisher.subscriber)) {
BackpressureUtils.addAndGet(publisher.demand, n);
boolean dataAvailable = publisher.readAndPublish();
if (!dataAvailable) {
publisher.changeState(this, DATA_UNAVAILABLE);
}
}
}
},
/**
* The terminal completed state. Does not respond to any events.

21
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/ServletHttpHandlerAdapter.java

@ -159,13 +159,28 @@ public class ServletHttpHandlerAdapter extends HttpServlet { @@ -159,13 +159,28 @@ public class ServletHttpHandlerAdapter extends HttpServlet {
}
public void registerListener() throws IOException {
this.synchronizer.getRequest().getInputStream()
.setReadListener(this.readListener);
inputStream().setReadListener(this.readListener);
}
private ServletInputStream inputStream() throws IOException {
return this.synchronizer.getRequest().getInputStream();
}
@Override
protected void checkOnDataAvailable() {
try {
if (!inputStream().isFinished() && inputStream().isReady()) {
onDataAvailable();
}
}
catch (IOException ex) {
onError(ex);
}
}
@Override
protected DataBuffer read() throws IOException {
ServletInputStream input = this.synchronizer.getRequest().getInputStream();
ServletInputStream input = inputStream();
if (input.isReady()) {
int read = input.read(this.buffer);
if (logger.isTraceEnabled()) {

5
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

@ -129,6 +129,11 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { @@ -129,6 +129,11 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
this.requestChannel.resumeReads();
}
@Override
protected void checkOnDataAvailable() {
onDataAvailable();
}
@Override
protected DataBuffer read() throws IOException {
ByteBuffer byteBuffer = this.pooledByteBuffer.getBuffer();

Loading…
Cancel
Save