@ -21,15 +21,16 @@ import java.io.InputStream;
import java.nio.ByteBuffer ;
import java.nio.ByteBuffer ;
import java.nio.channels.Channels ;
import java.nio.channels.Channels ;
import java.nio.channels.ReadableByteChannel ;
import java.nio.channels.ReadableByteChannel ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.List ;
import java.util.concurrent.atomic.AtomicLong ;
import java.util.concurrent.atomic.AtomicLong ;
import java.util.function.BiFunction ;
import java.util.function.BiFunction ;
import java.util.function.Consumer ;
import java.util.function.Consumer ;
import java.util.function.IntPredicate ;
import org.reactivestreams.Publisher ;
import org.reactivestreams.Publisher ;
import org.reactivestreams.Subscriber ;
import org.reactivestreams.Subscription ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.FluxSource ;
import reactor.core.subscriber.SignalEmitter ;
import reactor.core.subscriber.SignalEmitter ;
import org.springframework.core.io.buffer.DataBuffer ;
import org.springframework.core.io.buffer.DataBuffer ;
@ -102,7 +103,79 @@ public abstract class DataBufferUtils {
Assert . notNull ( publisher , "'publisher' must not be null" ) ;
Assert . notNull ( publisher , "'publisher' must not be null" ) ;
Assert . isTrue ( maxByteCount > = 0 , "'maxByteCount' must be a positive number" ) ;
Assert . isTrue ( maxByteCount > = 0 , "'maxByteCount' must be a positive number" ) ;
return new TakeByteUntilCount ( publisher , maxByteCount ) ;
AtomicLong byteCountDown = new AtomicLong ( maxByteCount ) ;
return Flux . from ( publisher ) .
takeWhile ( dataBuffer - > {
int delta = - dataBuffer . readableByteCount ( ) ;
long currentCount = byteCountDown . getAndAdd ( delta ) ;
return currentCount > = 0 ;
} ) .
map ( dataBuffer - > {
long currentCount = byteCountDown . get ( ) ;
if ( currentCount > = 0 ) {
return dataBuffer ;
}
else {
// last buffer
int size = ( int ) ( currentCount + dataBuffer . readableByteCount ( ) ) ;
return dataBuffer . slice ( 0 , size ) ;
}
} ) ;
}
/ * *
* Tokenize the { @link DataBuffer } using the given delimiter
* function . Does not include the delimiter in the result .
* @param dataBuffer the data buffer to tokenize
* @param delimiter the delimiter function
* @return the tokens
* /
public static List < DataBuffer > tokenize ( DataBuffer dataBuffer ,
IntPredicate delimiter ) {
Assert . notNull ( dataBuffer , "'dataBuffer' must not be null" ) ;
Assert . notNull ( delimiter , "'delimiter' must not be null" ) ;
List < DataBuffer > results = new ArrayList < DataBuffer > ( ) ;
int idx ;
do {
idx = dataBuffer . indexOf ( delimiter ) ;
if ( idx < 0 ) {
results . add ( dataBuffer ) ;
}
else {
if ( idx > 0 ) {
DataBuffer slice = dataBuffer . slice ( 0 , idx ) ;
slice = retain ( slice ) ;
results . add ( slice ) ;
}
int remainingLen = dataBuffer . readableByteCount ( ) - ( idx + 1 ) ;
if ( remainingLen > 0 ) {
dataBuffer = dataBuffer . slice ( idx + 1 , remainingLen ) ;
}
else {
release ( dataBuffer ) ;
idx = - 1 ;
}
}
}
while ( idx ! = - 1 ) ;
return Collections . unmodifiableList ( results ) ;
}
/ * *
* Retains the given data buffer , it it is a { @link PooledDataBuffer } .
* @param dataBuffer the data buffer to retain
* @return the retained buffer
* /
@SuppressWarnings ( "unchecked" )
public static < T extends DataBuffer > T retain ( T dataBuffer ) {
if ( dataBuffer instanceof PooledDataBuffer ) {
return ( T ) ( ( PooledDataBuffer ) dataBuffer ) . retain ( ) ;
}
else {
return dataBuffer ;
}
}
}
/ * *
/ * *
@ -117,63 +190,6 @@ public abstract class DataBufferUtils {
return false ;
return false ;
}
}
private static final class TakeByteUntilCount extends FluxSource < DataBuffer , DataBuffer > {
final long maxByteCount ;
TakeByteUntilCount ( Publisher < ? extends DataBuffer > source , long maxByteCount ) {
super ( source ) ;
this . maxByteCount = maxByteCount ;
}
@Override
public void subscribe ( Subscriber < ? super DataBuffer > subscriber ) {
source . subscribe ( new Subscriber < DataBuffer > ( ) {
private Subscription subscription ;
private final AtomicLong byteCount = new AtomicLong ( ) ;
@Override
public void onSubscribe ( Subscription s ) {
this . subscription = s ;
subscriber . onSubscribe ( s ) ;
}
@Override
public void onNext ( DataBuffer dataBuffer ) {
int delta = dataBuffer . readableByteCount ( ) ;
long currentCount = this . byteCount . addAndGet ( delta ) ;
if ( currentCount > maxByteCount ) {
int size = ( int ) ( maxByteCount - currentCount + delta ) ;
ByteBuffer byteBuffer =
( ByteBuffer ) dataBuffer . asByteBuffer ( ) . limit ( size ) ;
DataBuffer partialBuffer =
dataBuffer . allocator ( ) . allocateBuffer ( size ) ;
partialBuffer . write ( byteBuffer ) ;
subscriber . onNext ( partialBuffer ) ;
subscriber . onComplete ( ) ;
this . subscription . cancel ( ) ;
}
else {
subscriber . onNext ( dataBuffer ) ;
}
}
@Override
public void onError ( Throwable t ) {
subscriber . onError ( t ) ;
}
@Override
public void onComplete ( ) {
subscriber . onComplete ( ) ;
}
} ) ;
}
}
private static class ReadableByteChannelGenerator
private static class ReadableByteChannelGenerator
implements BiFunction < ReadableByteChannel , SignalEmitter < DataBuffer > ,
implements BiFunction < ReadableByteChannel , SignalEmitter < DataBuffer > ,
ReadableByteChannel > {
ReadableByteChannel > {