@ -28,10 +28,14 @@ import java.nio.channels.CompletionHandler;
@@ -28,10 +28,14 @@ import java.nio.channels.CompletionHandler;
import java.nio.channels.ReadableByteChannel ;
import java.nio.channels.WritableByteChannel ;
import java.nio.charset.Charset ;
import java.nio.file.OpenOption ;
import java.nio.file.Path ;
import java.nio.file.StandardOpenOption ;
import java.util.ArrayList ;
import java.util.Arrays ;
import java.util.HashSet ;
import java.util.List ;
import java.util.Set ;
import java.util.concurrent.Callable ;
import java.util.concurrent.atomic.AtomicBoolean ;
import java.util.concurrent.atomic.AtomicLong ;
@ -157,6 +161,31 @@ public abstract class DataBufferUtils {
@@ -157,6 +161,31 @@ public abstract class DataBufferUtils {
return flux . doOnDiscard ( PooledDataBuffer . class , DataBufferUtils : : release ) ;
}
/ * *
* Read bytes from the given file { @code Path } into a { @code Flux } of { @code DataBuffer } s .
* The method ensures that the file is closed when the flux is terminated .
* @param path the path to read bytes from
* @param bufferFactory the factory to create data buffers with
* @param bufferSize the maximum size of the data buffers
* @return a Flux of data buffers read from the given channel
* @since 5 . 2
* /
public static Flux < DataBuffer > read ( Path path , DataBufferFactory bufferFactory , int bufferSize ,
OpenOption . . . options ) {
Assert . notNull ( path , "Path must not be null" ) ;
Assert . notNull ( bufferFactory , "BufferFactory must not be null" ) ;
Assert . isTrue ( bufferSize > 0 , "'bufferSize' must be > 0" ) ;
if ( options . length > 0 ) {
for ( OpenOption option : options ) {
Assert . isTrue ( ! ( option = = StandardOpenOption . APPEND | | option = = StandardOpenOption . WRITE ) ,
"'" + option + "' not allowed" ) ;
}
}
return readAsynchronousFileChannel ( ( ) - > AsynchronousFileChannel . open ( path , options ) ,
bufferFactory , bufferSize ) ;
}
/ * *
* Read the given { @code Resource } into a { @code Flux } of { @code DataBuffer } s .
* < p > If the resource is a file , it is read into an
@ -311,6 +340,58 @@ public abstract class DataBufferUtils {
@@ -311,6 +340,58 @@ public abstract class DataBufferUtils {
} ) ;
}
/ * *
* Write the given stream of { @link DataBuffer DataBuffers } to the given
* file { @link Path } . The optional { @code options } parameter specifies
* how the created or opened ( defaults to
* { @link StandardOpenOption # CREATE CREATE } ,
* { @link StandardOpenOption # TRUNCATE_EXISTING TRUNCATE_EXISTING } , and
* { @link StandardOpenOption # WRITE WRITE } ) .
* @param source the stream of data buffers to be written
* @param destination the path to the file
* @param options options specifying how the file is opened
* @return a { @link Mono } that indicates completion or error
* @since 5 . 2
* /
public static Mono < Void > write ( Publisher < DataBuffer > source , Path destination , OpenOption . . . options ) {
Assert . notNull ( source , "Source must not be null" ) ;
Assert . notNull ( destination , "Destination must not be null" ) ;
Set < OpenOption > optionSet = checkWriteOptions ( options ) ;
return Mono . create ( sink - > {
try {
AsynchronousFileChannel channel = AsynchronousFileChannel . open ( destination , optionSet , null ) ;
sink . onDispose ( ( ) - > closeChannel ( channel ) ) ;
write ( source , channel ) . subscribe ( DataBufferUtils : : release ,
sink : : error ,
sink : : success ) ;
}
catch ( IOException ex ) {
sink . error ( ex ) ;
}
} ) ;
}
private static Set < OpenOption > checkWriteOptions ( OpenOption [ ] options ) {
int length = options . length ;
Set < OpenOption > result = new HashSet < > ( length + 3 ) ;
if ( length = = 0 ) {
result . add ( StandardOpenOption . CREATE ) ;
result . add ( StandardOpenOption . TRUNCATE_EXISTING ) ;
}
else {
for ( OpenOption opt : options ) {
if ( opt = = StandardOpenOption . READ ) {
throw new IllegalArgumentException ( "READ not allowed" ) ;
}
result . add ( opt ) ;
}
}
result . add ( StandardOpenOption . WRITE ) ;
return result ;
}
static void closeChannel ( @Nullable Channel channel ) {
if ( channel ! = null & & channel . isOpen ( ) ) {
try {
@ -586,7 +667,7 @@ public abstract class DataBufferUtils {
@@ -586,7 +667,7 @@ public abstract class DataBufferUtils {
* a delimiter , it is removed .
* @param dataBuffers the data buffers to join
* @param stripDelimiter whether to strip the delimiter
* @return
* @return the joined buffer
* /
private static DataBuffer joinAndStrip ( List < DataBuffer > dataBuffers ,
boolean stripDelimiter ) {
@ -880,6 +961,7 @@ public abstract class DataBufferUtils {
@@ -880,6 +961,7 @@ public abstract class DataBufferUtils {
this . sink . next ( dataBuffer ) ;
this . dataBuffer . set ( null ) ;
}
}
/ * *