@ -52,8 +52,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
@@ -52,8 +52,6 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
private Subscriber < ? super DataBuffer > subscriber ;
private volatile boolean dataAvailable ;
@Override
public void subscribe ( Subscriber < ? super DataBuffer > subscriber ) {
if ( this . logger . isTraceEnabled ( ) ) {
@ -101,28 +99,24 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
@@ -101,28 +99,24 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
/ * *
* Reads and publishes data buffers from the input . Continues till either there is no
* more demand , or till there is no more data to be read .
* @return { @code true } if there is more data to be rea d ; { @code false } otherwise
* @return { @code true } if there is more deman d ; { @code false } otherwise
* /
private boolean readAndPublish ( ) {
try {
while ( hasDemand ( ) ) {
DataBuffer dataBuffer = read ( ) ;
if ( dataBuffer ! = null ) {
BackpressureUtils . getAndSub ( this . demand , 1L ) ;
this . subscriber . onNext ( dataBuffer ) ;
}
else {
return false ;
}
private boolean readAndPublish ( ) throws IOException {
while ( hasDemand ( ) ) {
DataBuffer dataBuffer = read ( ) ;
if ( dataBuffer ! = null ) {
BackpressureUtils . getAndSub ( this . demand , 1L ) ;
this . subscriber . onNext ( dataBuffer ) ;
}
else {
return true ;
}
return true ;
}
catch ( IOException ex ) {
onError ( ex ) ;
return false ;
}
return false ;
}
protected abstract void checkOnDataAvailable ( ) ;
/ * *
* Reads a data buffer from the input , if possible . Returns { @code null } if a buffer
* could not be read .
@ -182,10 +176,13 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
@@ -182,10 +176,13 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
* UNSUBSCRIBED
* |
* v
* DATA_UNAVAILABLE < - - - > DATA_AVAILABLE
* | |
* v v
* COMPLETED
* NO_DEMAND - - - - - - - - - - - - - - - - - - - > DEMAND
* | ^ ^ |
* | | | |
* | - - - - - - - - - READING < - - - - - |
* | | |
* | v |
* - - - - - - - - - - - - > COMPLETED < - - - - - - - - -
* < / pre >
* Refer to the individual states for more information .
* /
@ -194,16 +191,14 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
@@ -194,16 +191,14 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
/ * *
* The initial unsubscribed state . Will respond to { @link
* # subscribe ( AbstractRequestBodyPublisher , Subscriber ) } by
* changing state to { @link # DATA_UNAVAILABLE } .
* changing state to { @link # NO_DEMAND } .
* /
UNSUBSCRIBED {
@Override
void subscribe ( AbstractRequestBodyPublisher publisher ,
Subscriber < ? super DataBuffer > subscriber ) {
Objects . requireNonNull ( subscriber ) ;
State newState =
publisher . dataAvailable ? DATA_AVAILABLE : DATA_UNAVAILABLE ;
if ( publisher . changeState ( this , newState ) ) {
if ( publisher . changeState ( this , NO_DEMAND ) ) {
Subscription subscription = new RequestBodySubscription (
publisher ) ;
publisher . subscriber = subscriber ;
@ -213,54 +208,55 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
@@ -213,54 +208,55 @@ abstract class AbstractRequestBodyPublisher implements Publisher<DataBuffer> {
throw new IllegalStateException ( toString ( ) ) ;
}
}
@Override
void onDataAvailable ( AbstractRequestBodyPublisher publisher ) {
publisher . dataAvailable = true ;
}
} ,
/ * *
* State that gets entered when there is no data to be read . Responds to { @link
* # request ( AbstractRequestBodyPublisher , long ) } by increasing the demand , and
* responds to { @link # onDataAvailable ( AbstractRequestBodyPublisher ) } by
* reading the available data and changing state to { @link # DATA_AVAILABLE } if
* there continues to be more data available after the demand has been satisfied .
* State that gets entered when there is no demand . Responds to { @link
* # request ( AbstractRequestBodyPublisher , long ) } by increasing the demand ,
* changing state to { @link # DEMAND } and will check whether there
* is data available for reading .
* /
DATA_UNAVAILABLE {
NO_DEMAND {
@Override
void request ( AbstractRequestBodyPublisher publisher , long n ) {
if ( BackpressureUtils . checkRequest ( n , publisher . subscriber ) ) {
BackpressureUtils . addAndGet ( publisher . demand , n ) ;
if ( publisher . changeState ( this , DEMAND ) ) {
publisher . checkOnDataAvailable ( ) ;
}
}
}
} ,
/ * *
* State that gets entered when there is demand . Responds to
* { @link # onDataAvailable ( AbstractRequestBodyPublisher ) } by
* reading the available data . The state will be changed to
* { @link # NO_DEMAND } if there is no demand .
* /
DEMAND {
@Override
void onDataAvailable ( AbstractRequestBodyPublisher publisher ) {
boolean dataAvailable = publisher . readAndPublish ( ) ;
if ( dataAvailable ) {
publisher . changeState ( this , DATA_AVAILABLE ) ;
if ( publisher . changeState ( this , READING ) ) {
try {
boolean demandAvailable = publisher . readAndPublish ( ) ;
if ( demandAvailable ) {
publisher . changeState ( READING , DEMAND ) ;
publisher . checkOnDataAvailable ( ) ;
} else {
publisher . changeState ( READING , NO_DEMAND ) ;
}
} catch ( IOException ex ) {
publisher . onError ( ex ) ;
}
}
}
} ,
/ * *
* State that gets entered when there is data to be read . Responds to { @link
* # request ( AbstractRequestBodyPublisher , long ) } by increasing the demand , and
* by reading the available data and changing state to { @link # DATA_UNAVAILABLE }
* if there is no more data available .
* /
DATA_AVAILABLE {
READING {
@Override
void request ( AbstractRequestBodyPublisher publisher , long n ) {
if ( BackpressureUtils . checkRequest ( n , publisher . subscriber ) ) {
BackpressureUtils . addAndGet ( publisher . demand , n ) ;
boolean dataAvailable = publisher . readAndPublish ( ) ;
if ( ! dataAvailable ) {
publisher . changeState ( this , DATA_UNAVAILABLE ) ;
}
}
}
} ,
/ * *
* The terminal completed state . Does not respond to any events .