@ -144,10 +144,8 @@ public abstract class DataBufferUtils {
@@ -144,10 +144,8 @@ public abstract class DataBufferUtils {
channel - > Flux . create ( sink - > {
ReadCompletionHandler handler =
new ReadCompletionHandler ( channel , sink , position , bufferFactory , bufferSize ) ;
sink . onDispose ( handler : : dispose ) ;
DataBuffer dataBuffer = bufferFactory . allocateBuffer ( bufferSize ) ;
ByteBuffer byteBuffer = dataBuffer . asByteBuffer ( 0 , bufferSize ) ;
channel . read ( byteBuffer , position , dataBuffer , handler ) ;
sink . onCancel ( handler : : cancel ) ;
sink . onRequest ( handler : : request ) ;
} ) ,
channel - > {
// Do not close channel from here, rather wait for the current read callback
@ -654,7 +652,9 @@ public abstract class DataBufferUtils {
@@ -654,7 +652,9 @@ public abstract class DataBufferUtils {
private final AtomicLong position ;
private final AtomicBoolean disposed = new AtomicBoolean ( ) ;
private final AtomicBoolean reading = new AtomicBoolean ( ) ;
private final AtomicBoolean canceled = new AtomicBoolean ( ) ;
public ReadCompletionHandler ( AsynchronousFileChannel channel ,
FluxSink < DataBuffer > sink , long position , DataBufferFactory dataBufferFactory , int bufferSize ) {
@ -666,43 +666,62 @@ public abstract class DataBufferUtils {
@@ -666,43 +666,62 @@ public abstract class DataBufferUtils {
this . bufferSize = bufferSize ;
}
public void read ( ) {
if ( this . sink . requestedFromDownstream ( ) > 0 & & this . reading . compareAndSet ( false , true ) ) {
DataBuffer dataBuffer = this . dataBufferFactory . allocateBuffer ( this . bufferSize ) ;
ByteBuffer byteBuffer = dataBuffer . asByteBuffer ( 0 , this . bufferSize ) ;
this . channel . read ( byteBuffer , this . position . get ( ) , dataBuffer , this ) ;
}
}
@Override
public void completed ( Integer read , DataBuffer dataBuffer ) {
if ( read ! = - 1 & & ! this . disposed . get ( ) ) {
long pos = this . position . addAndGet ( read ) ;
dataBuffer . writePosition ( read ) ;
this . sink . next ( dataBuffer ) ;
// onNext may have led to onCancel (e.g. downstream takeUntil)
if ( this . disposed . get ( ) ) {
complete ( ) ;
this . reading . set ( false ) ;
if ( ! isCanceled ( ) ) {
if ( read ! = - 1 ) {
this . po sitio n. addA ndG et( read ) ;
dataBuffer . writePosition ( read ) ;
this . sink . next ( dataBuffer ) ;
read ( ) ;
}
else {
DataBuffer newDataBuffer = this . dataBufferFactory . allocateBuffer ( this . bufferSize ) ;
ByteBuffer newByteBuffer = newDataBuffer . asByteBuffer ( 0 , this . bufferSize ) ;
this . channel . read ( newByteBuffer , pos , newDataBuffer , this ) ;
release ( dataBuffer ) ;
closeChannel ( this . channel ) ;
this . sink . complete ( ) ;
}
}
else {
release ( dataBuffer ) ;
complete ( ) ;
closeChannel ( this . channel ) ;
}
}
private void complete ( ) {
this . sink . complete ( ) ;
closeChannel ( this . channel ) ;
}
@Override
public void failed ( Throwable exc , DataBuffer dataBuffer ) {
this . reading . set ( false ) ;
release ( dataBuffer ) ;
this . sink . error ( exc ) ;
closeChannel ( this . channel ) ;
if ( ! isCanceled ( ) ) {
this . sink . error ( exc ) ;
}
}
public void request ( long n ) {
read ( ) ;
}
public void dispose ( ) {
this . disposed . set ( true ) ;
public void cancel ( ) {
if ( this . canceled . compareAndSet ( false , true ) ) {
if ( ! this . reading . get ( ) ) {
closeChannel ( this . channel ) ;
}
}
}
private boolean isCanceled ( ) {
return this . canceled . get ( ) ;
}
}