@ -27,6 +27,7 @@ import java.nio.channels.Channels;
@@ -27,6 +27,7 @@ import java.nio.channels.Channels;
import java.nio.channels.CompletionHandler ;
import java.nio.channels.ReadableByteChannel ;
import java.nio.channels.WritableByteChannel ;
import java.nio.charset.Charset ;
import java.nio.file.StandardOpenOption ;
import java.util.ArrayList ;
import java.util.Arrays ;
@ -36,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong ;
import java.util.concurrent.atomic.AtomicReference ;
import java.util.function.Consumer ;
import java.util.function.IntPredicate ;
import org.reactivestreams.Publisher ;
import org.reactivestreams.Subscription ;
@ -60,8 +62,6 @@ public abstract class DataBufferUtils {
@@ -60,8 +62,6 @@ public abstract class DataBufferUtils {
private static final Consumer < DataBuffer > RELEASE_CONSUMER = DataBufferUtils : : release ;
private static final DataBuffer END_FRAME = new DefaultDataBufferFactory ( ) . wrap ( new byte [ 0 ] ) ;
//---------------------------------------------------------------------
// Reading
@ -486,10 +486,9 @@ public abstract class DataBufferUtils {
@@ -486,10 +486,9 @@ public abstract class DataBufferUtils {
/ * *
* Splits the given stream of data buffers around the given delimiter .
* The returned flux contains data buffers that are terminated by the given delimiter ,
* which is included when { @code stripDelimiter } is { @code true } , or stripped off when
* { @code false } .
* which is included when { @code stripDelimiter } is { @code false } .
* @param dataBuffers the input stream of data buffers
* @param delimiter the delimiting byte array
* @param delimiter the delimiter bytes
* @param stripDelimiter whether to include the delimiter at the end of each resulting buffer
* @return the flux of data buffers created by splitting the given data buffers around the
* given delimiter
@ -497,29 +496,78 @@ public abstract class DataBufferUtils {
@@ -497,29 +496,78 @@ public abstract class DataBufferUtils {
* /
public static Flux < DataBuffer > split ( Publisher < DataBuffer > dataBuffers , byte [ ] delimiter ,
boolean stripDelimiter ) {
return split ( dataBuffers , new byte [ ] [ ] { delimiter } , stripDelimiter ) ;
}
/ * *
* Splits the given stream of data buffers around the given delimiters .
* The returned flux contains data buffers that are terminated by any of the given delimiters ,
* which are included when { @code stripDelimiter } is { @code false } .
* @param dataBuffers the input stream of data buffers
* @param delimiters the delimiters , one per element
* @param stripDelimiter whether to include the delimiters at the end of each resulting buffer
* @return the flux of data buffers created by splitting the given data buffers around the
* given delimiters
* @since 5 . 2
* /
public static Flux < DataBuffer > split ( Publisher < DataBuffer > dataBuffers , byte [ ] [ ] delimiters ,
boolean stripDelimiter ) {
Assert . notNull ( dataBuffers , "DataBuffers must not be null" ) ;
Assert . isTrue ( delimiter . length > 0 , "Delimiter must not be empty" ) ;
Assert . isTrue ( delimiters . length > 0 , "Delimiter must not be empty" ) ;
Matcher [ ] matchers = matchers ( delimiters ) ;
Matcher matcher = matcher ( delimiter ) ;
return Flux . from ( dataBuffers )
. flatMap ( buffer - > endFrameOnDelimiter ( buffer , matcher ) )
. bufferUntil ( buffer - > buffer = = END_FRAME )
. map ( buffers - > joinAndStrip ( buffers , delimiter , stripDelimiter ) )
. flatMap ( buffer - > endFrameAfter Delimiter ( buffer , matchers ) )
. bufferUntil ( buffer - > buffer instanceof EndFrameBuffer )
. map ( buffers - > joinAndStrip ( buffers , stripDelimiter ) )
. doOnDiscard ( PooledDataBuffer . class , DataBufferUtils : : release ) ;
}
// Return Flux, because returning List (w/ flatMapIterable) results in memory leaks because
// of pre-fetching.
private static Flux < DataBuffer > endFrameOnDelimiter ( DataBuffer dataBuffer , Matcher matcher ) {
private static Matcher [ ] matchers ( byte [ ] [ ] delimiters ) {
Assert . isTrue ( delimiters . length > 0 , "Delimiters must not be empty" ) ;
Matcher [ ] result = new Matcher [ delimiters . length ] ;
for ( int i = 0 ; i < delimiters . length ; i + + ) {
result [ i ] = matcher ( delimiters [ i ] ) ;
}
return result ;
}
/ * *
* Finds the { @link Matcher } with the first match and longest delimiter , and inserts a
* { @link EndFrameBuffer } just after its match .
*
* @param dataBuffer the buffer to find delimiters in
* @param matchers used to find the first delimiters
* @return a flux of buffers , containing { @link EndFrameBuffer } after each delimiter that was
* found in { @code dataBuffer } . Returns Flux , because returning List ( w / flatMapIterable )
* results in memory leaks due to pre - fetching .
* /
private static Flux < DataBuffer > endFrameAfterDelimiter ( DataBuffer dataBuffer , Matcher [ ] matchers ) {
List < DataBuffer > result = new ArrayList < > ( ) ;
do {
int endIdx = matcher . match ( dataBuffer ) ;
int readPosition = dataBuffer . readPosition ( ) ;
if ( endIdx ! = - 1 ) {
int length = endIdx + 1 - readPosition ;
int matchedEndIdx = Integer . MAX_VALUE ;
byte [ ] matchedDelimiter = new byte [ 0 ] ;
for ( Matcher matcher : matchers ) {
int endIdx = matcher . match ( dataBuffer ) ;
if ( endIdx ! = - 1 & &
endIdx < = matchedEndIdx & &
matcher . delimiter ( ) . length > matchedDelimiter . length ) {
matchedEndIdx = endIdx ;
matchedDelimiter = matcher . delimiter ( ) ;
}
}
if ( matchedDelimiter . length > 0 ) {
int readPosition = dataBuffer . readPosition ( ) ;
int length = matchedEndIdx + 1 - readPosition ;
result . add ( dataBuffer . retainedSlice ( readPosition , length ) ) ;
result . add ( END_FRAME ) ;
dataBuffer . readPosition ( endIdx + 1 ) ;
result . add ( new EndFrameBuffer ( matchedDelimiter ) ) ;
dataBuffer . readPosition ( matchedEndIdx + 1 ) ;
for ( Matcher matcher : matchers ) {
matcher . reset ( ) ;
}
}
else {
result . add ( retain ( dataBuffer ) ) ;
@ -532,21 +580,32 @@ public abstract class DataBufferUtils {
@@ -532,21 +580,32 @@ public abstract class DataBufferUtils {
return Flux . fromIterable ( result ) ;
}
private static DataBuffer joinAndStrip ( List < DataBuffer > dataBuffers , byte [ ] delimiter ,
/ * *
* Joins the given list of buffers . If the list ends with a { @link EndFrameBuffer } , it is
* removed . If { @code stripDelimiter } is { @code true } and the resulting buffer ends with
* a delimiter , it is removed .
* @param dataBuffers the data buffers to join
* @param stripDelimiter whether to strip the delimiter
* @return
* /
private static DataBuffer joinAndStrip ( List < DataBuffer > dataBuffers ,
boolean stripDelimiter ) {
Assert . state ( ! dataBuffers . isEmpty ( ) , "DataBuffers should not be empty" ) ;
boolean endFrameFound = false ;
byte [ ] matchingDelimiter = null ;
int lastIdx = dataBuffers . size ( ) - 1 ;
if ( dataBuffers . get ( lastIdx ) = = END_FRAME ) {
endFrameFound = true ;
DataBuffer lastBuffer = dataBuffers . get ( lastIdx ) ;
if ( lastBuffer instanceof EndFrameBuffer ) {
matchingDelimiter = ( ( EndFrameBuffer ) lastBuffer ) . delimiter ( ) ;
dataBuffers . remove ( lastIdx ) ;
}
DataBuffer result = dataBuffers . get ( 0 ) . factory ( ) . join ( dataBuffers ) ;
if ( stripDelimiter & & endFrameFound ) {
result . writePosition ( result . writePosition ( ) - delimiter . length ) ;
if ( stripDelimiter & & matchingDelimiter ! = null ) {
result . writePosition ( result . writePosition ( ) - matchingDelimiter . length ) ;
}
return result ;
}
@ -573,6 +632,11 @@ public abstract class DataBufferUtils {
@@ -573,6 +632,11 @@ public abstract class DataBufferUtils {
* /
byte [ ] delimiter ( ) ;
/ * *
* Resets the state of this matcher .
* /
void reset ( ) ;
}
@ -865,7 +929,7 @@ public abstract class DataBufferUtils {
@@ -865,7 +929,7 @@ public abstract class DataBufferUtils {
if ( b = = this . delimiter [ this . matches ] ) {
this . matches + + ;
if ( this . matches = = this . delimiter . length ) {
this . matches = 0 ;
reset ( ) ;
return i ;
}
}
@ -877,6 +941,173 @@ public abstract class DataBufferUtils {
@@ -877,6 +941,173 @@ public abstract class DataBufferUtils {
public byte [ ] delimiter ( ) {
return Arrays . copyOf ( this . delimiter , this . delimiter . length ) ;
}
@Override
public void reset ( ) {
this . matches = 0 ;
}
}
private static class EndFrameBuffer implements DataBuffer {
private static final DataBuffer BUFFER = new DefaultDataBufferFactory ( ) . wrap ( new byte [ 0 ] ) ;
private byte [ ] delimiter ;
public EndFrameBuffer ( byte [ ] delimiter ) {
this . delimiter = delimiter ;
}
public byte [ ] delimiter ( ) {
return this . delimiter ;
}
@Override
public DataBufferFactory factory ( ) {
return BUFFER . factory ( ) ;
}
@Override
public int indexOf ( IntPredicate predicate , int fromIndex ) {
return BUFFER . indexOf ( predicate , fromIndex ) ;
}
@Override
public int lastIndexOf ( IntPredicate predicate , int fromIndex ) {
return BUFFER . lastIndexOf ( predicate , fromIndex ) ;
}
@Override
public int readableByteCount ( ) {
return BUFFER . readableByteCount ( ) ;
}
@Override
public int writableByteCount ( ) {
return BUFFER . writableByteCount ( ) ;
}
@Override
public int capacity ( ) {
return BUFFER . capacity ( ) ;
}
@Override
public DataBuffer capacity ( int capacity ) {
return BUFFER . capacity ( capacity ) ;
}
@Override
public DataBuffer ensureCapacity ( int capacity ) {
return BUFFER . ensureCapacity ( capacity ) ;
}
@Override
public int readPosition ( ) {
return BUFFER . readPosition ( ) ;
}
@Override
public DataBuffer readPosition ( int readPosition ) {
return BUFFER . readPosition ( readPosition ) ;
}
@Override
public int writePosition ( ) {
return BUFFER . writePosition ( ) ;
}
@Override
public DataBuffer writePosition ( int writePosition ) {
return BUFFER . writePosition ( writePosition ) ;
}
@Override
public byte getByte ( int index ) {
return BUFFER . getByte ( index ) ;
}
@Override
public byte read ( ) {
return BUFFER . read ( ) ;
}
@Override
public DataBuffer read ( byte [ ] destination ) {
return BUFFER . read ( destination ) ;
}
@Override
public DataBuffer read ( byte [ ] destination , int offset , int length ) {
return BUFFER . read ( destination , offset , length ) ;
}
@Override
public DataBuffer write ( byte b ) {
return BUFFER . write ( b ) ;
}
@Override
public DataBuffer write ( byte [ ] source ) {
return BUFFER . write ( source ) ;
}
@Override
public DataBuffer write ( byte [ ] source , int offset , int length ) {
return BUFFER . write ( source , offset , length ) ;
}
@Override
public DataBuffer write ( DataBuffer . . . buffers ) {
return BUFFER . write ( buffers ) ;
}
@Override
public DataBuffer write ( ByteBuffer . . . buffers ) {
return BUFFER . write ( buffers ) ;
}
@Override
public DataBuffer write ( CharSequence charSequence , Charset charset ) {
return BUFFER . write ( charSequence , charset ) ;
}
@Override
public DataBuffer slice ( int index , int length ) {
return BUFFER . slice ( index , length ) ;
}
@Override
public DataBuffer retainedSlice ( int index , int length ) {
return BUFFER . retainedSlice ( index , length ) ;
}
@Override
public ByteBuffer asByteBuffer ( ) {
return BUFFER . asByteBuffer ( ) ;
}
@Override
public ByteBuffer asByteBuffer ( int index , int length ) {
return BUFFER . asByteBuffer ( index , length ) ;
}
@Override
public InputStream asInputStream ( ) {
return BUFFER . asInputStream ( ) ;
}
@Override
public InputStream asInputStream ( boolean releaseOnClose ) {
return BUFFER . asInputStream ( releaseOnClose ) ;
}
@Override
public OutputStream asOutputStream ( ) {
return BUFFER . asOutputStream ( ) ;
}
}
}