|
|
|
@ -57,12 +57,6 @@ public abstract class DataBufferUtils {
@@ -57,12 +57,6 @@ public abstract class DataBufferUtils {
|
|
|
|
|
|
|
|
|
|
private static final Consumer<DataBuffer> RELEASE_CONSUMER = DataBufferUtils::release; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Workaround to disable use of pooled buffers: |
|
|
|
|
* https://github.com/reactor/reactor-core/issues/1634.
|
|
|
|
|
*/ |
|
|
|
|
private static final DataBufferFactory defaultDataBufferFactory = new DefaultDataBufferFactory(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
//---------------------------------------------------------------------
|
|
|
|
|
// Reading
|
|
|
|
@ -141,14 +135,12 @@ public abstract class DataBufferUtils {
@@ -141,14 +135,12 @@ public abstract class DataBufferUtils {
|
|
|
|
|
Assert.isTrue(position >= 0, "'position' must be >= 0"); |
|
|
|
|
Assert.isTrue(bufferSize > 0, "'bufferSize' must be > 0"); |
|
|
|
|
|
|
|
|
|
DataBufferFactory bufferFactoryToUse = defaultDataBufferFactory; |
|
|
|
|
|
|
|
|
|
Flux<DataBuffer> flux = Flux.using(channelSupplier, |
|
|
|
|
channel -> Flux.create(sink -> { |
|
|
|
|
ReadCompletionHandler handler = |
|
|
|
|
new ReadCompletionHandler(channel, sink, position, bufferFactoryToUse, bufferSize); |
|
|
|
|
new ReadCompletionHandler(channel, sink, position, bufferFactory, bufferSize); |
|
|
|
|
sink.onDispose(handler::dispose); |
|
|
|
|
DataBuffer dataBuffer = bufferFactoryToUse.allocateBuffer(bufferSize); |
|
|
|
|
DataBuffer dataBuffer = bufferFactory.allocateBuffer(bufferSize); |
|
|
|
|
ByteBuffer byteBuffer = dataBuffer.asByteBuffer(0, bufferSize); |
|
|
|
|
channel.read(byteBuffer, position, dataBuffer, handler); |
|
|
|
|
}), |
|
|
|
|