|
|
|
@ -18,12 +18,12 @@ package org.springframework.core.io.buffer.support;
@@ -18,12 +18,12 @@ package org.springframework.core.io.buffer.support;
|
|
|
|
|
|
|
|
|
|
import java.io.IOException; |
|
|
|
|
import java.io.InputStream; |
|
|
|
|
import java.util.concurrent.BlockingQueue; |
|
|
|
|
import java.util.Iterator; |
|
|
|
|
import java.util.concurrent.atomic.AtomicBoolean; |
|
|
|
|
|
|
|
|
|
import org.reactivestreams.Publisher; |
|
|
|
|
import org.reactivestreams.Subscription; |
|
|
|
|
import reactor.rx.Stream; |
|
|
|
|
import reactor.core.publisher.Flux; |
|
|
|
|
|
|
|
|
|
import org.springframework.core.io.buffer.DataBuffer; |
|
|
|
|
import org.springframework.util.Assert; |
|
|
|
@ -35,7 +35,7 @@ class DataBufferPublisherInputStream extends InputStream {
@@ -35,7 +35,7 @@ class DataBufferPublisherInputStream extends InputStream {
|
|
|
|
|
|
|
|
|
|
private final AtomicBoolean completed = new AtomicBoolean(); |
|
|
|
|
|
|
|
|
|
private final BlockingQueue<DataBuffer> queue; |
|
|
|
|
private final Iterator<DataBuffer> queue; |
|
|
|
|
|
|
|
|
|
private InputStream currentStream; |
|
|
|
|
|
|
|
|
@ -57,8 +57,7 @@ class DataBufferPublisherInputStream extends InputStream {
@@ -57,8 +57,7 @@ class DataBufferPublisherInputStream extends InputStream {
|
|
|
|
|
int requestSize) { |
|
|
|
|
Assert.notNull(publisher, "'publisher' must not be null"); |
|
|
|
|
|
|
|
|
|
// TODO Avoid using Reactor Stream, it should not be a mandatory dependency of Spring Reactive
|
|
|
|
|
this.queue = Stream.from(publisher).toBlockingQueue(requestSize); |
|
|
|
|
this.queue = Flux.from(publisher).toIterable(requestSize).iterator(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@ -126,25 +125,22 @@ class DataBufferPublisherInputStream extends InputStream {
@@ -126,25 +125,22 @@ class DataBufferPublisherInputStream extends InputStream {
|
|
|
|
|
return this.currentStream; |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
// take() blocks until next or complete() then return null,
|
|
|
|
|
// but that's OK since this is a *blocking* InputStream
|
|
|
|
|
DataBuffer signal = this.queue.take(); |
|
|
|
|
if (signal == null) { |
|
|
|
|
// if upstream Publisher has completed, then complete() and return null,
|
|
|
|
|
if (!this.queue.hasNext()) { |
|
|
|
|
this.completed.set(true); |
|
|
|
|
return null; |
|
|
|
|
} |
|
|
|
|
// next() blocks until next
|
|
|
|
|
// but that's OK since this is a *blocking* InputStream
|
|
|
|
|
DataBuffer signal = this.queue.next(); |
|
|
|
|
this.currentStream = signal.asInputStream(); |
|
|
|
|
return this.currentStream; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
catch (InterruptedException ex) { |
|
|
|
|
Thread.currentThread().interrupt(); |
|
|
|
|
} |
|
|
|
|
catch (Throwable error) { |
|
|
|
|
this.completed.set(true); |
|
|
|
|
throw new IOException(error); |
|
|
|
|
} |
|
|
|
|
throw new IOException(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|