diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 8eb5dfb210..0ba1fc2066 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -28,10 +28,14 @@ import java.nio.channels.CompletionHandler; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.charset.Charset; +import java.nio.file.OpenOption; +import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -157,6 +161,31 @@ public abstract class DataBufferUtils { return flux.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } + /** + * Read bytes from the given file {@code Path} into a {@code Flux} of {@code DataBuffer}s. + * The method ensures that the file is closed when the flux is terminated. + * @param path the path to read bytes from + * @param bufferFactory the factory to create data buffers with + * @param bufferSize the maximum size of the data buffers + * @return a Flux of data buffers read from the given channel + * @since 5.2 + */ + public static Flux read(Path path, DataBufferFactory bufferFactory, int bufferSize, + OpenOption... options) { + Assert.notNull(path, "Path must not be null"); + Assert.notNull(bufferFactory, "BufferFactory must not be null"); + Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0"); + if (options.length > 0) { + for (OpenOption option : options) { + Assert.isTrue(!(option == StandardOpenOption.APPEND || option == StandardOpenOption.WRITE), + "'" + option + "' not allowed"); + } + } + + return readAsynchronousFileChannel(() -> AsynchronousFileChannel.open(path, options), + bufferFactory, bufferSize); + } + /** * Read the given {@code Resource} into a {@code Flux} of {@code DataBuffer}s. *

If the resource is a file, it is read into an @@ -311,6 +340,58 @@ public abstract class DataBufferUtils { }); } + /** + * Write the given stream of {@link DataBuffer DataBuffers} to the given + * file {@link Path}. The optional {@code options} parameter specifies + * how the created or opened (defaults to + * {@link StandardOpenOption#CREATE CREATE}, + * {@link StandardOpenOption#TRUNCATE_EXISTING TRUNCATE_EXISTING}, and + * {@link StandardOpenOption#WRITE WRITE}). + * @param source the stream of data buffers to be written + * @param destination the path to the file + * @param options options specifying how the file is opened + * @return a {@link Mono} that indicates completion or error + * @since 5.2 + */ + public static Mono write(Publisher source, Path destination, OpenOption... options) { + Assert.notNull(source, "Source must not be null"); + Assert.notNull(destination, "Destination must not be null"); + + Set optionSet = checkWriteOptions(options); + + return Mono.create(sink -> { + try { + AsynchronousFileChannel channel = AsynchronousFileChannel.open(destination, optionSet, null); + sink.onDispose(() -> closeChannel(channel)); + write(source, channel).subscribe(DataBufferUtils::release, + sink::error, + sink::success); + } + catch (IOException ex) { + sink.error(ex); + } + }); + } + + private static Set checkWriteOptions(OpenOption[] options) { + int length = options.length; + Set result = new HashSet<>(length + 3); + if (length == 0) { + result.add(StandardOpenOption.CREATE); + result.add(StandardOpenOption.TRUNCATE_EXISTING); + } + else { + for (OpenOption opt : options) { + if (opt == StandardOpenOption.READ) { + throw new IllegalArgumentException("READ not allowed"); + } + result.add(opt); + } + } + result.add(StandardOpenOption.WRITE); + return result; + } + static void closeChannel(@Nullable Channel channel) { if (channel != null && channel.isOpen()) { try { @@ -586,7 +667,7 @@ public abstract class DataBufferUtils { * a delimiter, it is removed. * @param dataBuffers the data buffers to join * @param stripDelimiter whether to strip the delimiter - * @return + * @return the joined buffer */ private static DataBuffer joinAndStrip(List dataBuffers, boolean stripDelimiter) { @@ -880,6 +961,7 @@ public abstract class DataBufferUtils { this.sink.next(dataBuffer); this.dataBuffer.set(null); } + } /** diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index f1e0162c43..0e6db5903f 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -31,6 +31,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; import java.time.Duration; +import java.util.List; import java.util.concurrent.CountDownLatch; import io.netty.buffer.ByteBuf; @@ -202,6 +203,13 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { subscriber.cancel(); } + @Test + public void readPath() throws IOException { + Flux flux = DataBufferUtils.read(this.resource.getFile().toPath(), this.bufferFactory, 3); + + verifyReadData(flux); + } + @Test public void readResource() throws Exception { Flux flux = DataBufferUtils.read(this.resource, this.bufferFactory, 3); @@ -474,6 +482,21 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { flux.subscribe(DataBufferUtils::release); } + @Test + public void writePath() throws IOException { + DataBuffer foo = stringBuffer("foo"); + DataBuffer bar = stringBuffer("bar"); + Flux flux = Flux.just(foo, bar); + + Mono result = DataBufferUtils.write(flux, tempFile); + + StepVerifier.create(result) + .verifyComplete(); + + List written = Files.readAllLines(tempFile); + assertThat(written).contains("foobar"); + } + @Test public void readAndWriteByteChannel() throws Exception { Path source = Paths.get(