|
|
@ -32,11 +32,13 @@ import java.util.Optional; |
|
|
|
import java.util.concurrent.atomic.AtomicInteger; |
|
|
|
import java.util.concurrent.atomic.AtomicInteger; |
|
|
|
import java.util.function.Consumer; |
|
|
|
import java.util.function.Consumer; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import org.synchronoss.cloud.nio.multipart.DefaultPartBodyStreamStorageFactory; |
|
|
|
import org.synchronoss.cloud.nio.multipart.Multipart; |
|
|
|
import org.synchronoss.cloud.nio.multipart.Multipart; |
|
|
|
import org.synchronoss.cloud.nio.multipart.MultipartContext; |
|
|
|
import org.synchronoss.cloud.nio.multipart.MultipartContext; |
|
|
|
import org.synchronoss.cloud.nio.multipart.MultipartUtils; |
|
|
|
import org.synchronoss.cloud.nio.multipart.MultipartUtils; |
|
|
|
import org.synchronoss.cloud.nio.multipart.NioMultipartParser; |
|
|
|
import org.synchronoss.cloud.nio.multipart.NioMultipartParser; |
|
|
|
import org.synchronoss.cloud.nio.multipart.NioMultipartParserListener; |
|
|
|
import org.synchronoss.cloud.nio.multipart.NioMultipartParserListener; |
|
|
|
|
|
|
|
import org.synchronoss.cloud.nio.multipart.PartBodyStreamStorageFactory; |
|
|
|
import org.synchronoss.cloud.nio.stream.storage.StreamStorage; |
|
|
|
import org.synchronoss.cloud.nio.stream.storage.StreamStorage; |
|
|
|
import reactor.core.publisher.Flux; |
|
|
|
import reactor.core.publisher.Flux; |
|
|
|
import reactor.core.publisher.FluxSink; |
|
|
|
import reactor.core.publisher.FluxSink; |
|
|
@ -72,6 +74,8 @@ public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> |
|
|
|
|
|
|
|
|
|
|
|
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); |
|
|
|
private final DataBufferFactory bufferFactory = new DefaultDataBufferFactory(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final PartBodyStreamStorageFactory streamStorageFactory = new DefaultPartBodyStreamStorageFactory(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public List<MediaType> getReadableMediaTypes() { |
|
|
|
public List<MediaType> getReadableMediaTypes() { |
|
|
@ -89,7 +93,7 @@ public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> |
|
|
|
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, |
|
|
|
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, |
|
|
|
Map<String, Object> hints) { |
|
|
|
Map<String, Object> hints) { |
|
|
|
|
|
|
|
|
|
|
|
return Flux.create(new SynchronossPartGenerator(message, this.bufferFactory)); |
|
|
|
return Flux.create(new SynchronossPartGenerator(message, this.bufferFactory, this.streamStorageFactory)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -112,10 +116,14 @@ public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> |
|
|
|
|
|
|
|
|
|
|
|
private final DataBufferFactory bufferFactory; |
|
|
|
private final DataBufferFactory bufferFactory; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private final PartBodyStreamStorageFactory streamStorageFactory; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SynchronossPartGenerator(ReactiveHttpInputMessage inputMessage, DataBufferFactory factory) { |
|
|
|
SynchronossPartGenerator(ReactiveHttpInputMessage inputMessage, DataBufferFactory bufferFactory, |
|
|
|
|
|
|
|
PartBodyStreamStorageFactory streamStorageFactory) { |
|
|
|
this.inputMessage = inputMessage; |
|
|
|
this.inputMessage = inputMessage; |
|
|
|
this.bufferFactory = factory; |
|
|
|
this.bufferFactory = bufferFactory; |
|
|
|
|
|
|
|
this.streamStorageFactory = streamStorageFactory; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -130,7 +138,10 @@ public class SynchronossPartHttpMessageReader implements HttpMessageReader<Part> |
|
|
|
MultipartContext context = new MultipartContext(mediaType.toString(), length, charset.name()); |
|
|
|
MultipartContext context = new MultipartContext(mediaType.toString(), length, charset.name()); |
|
|
|
|
|
|
|
|
|
|
|
NioMultipartParserListener listener = new FluxSinkAdapterListener(emitter, this.bufferFactory, context); |
|
|
|
NioMultipartParserListener listener = new FluxSinkAdapterListener(emitter, this.bufferFactory, context); |
|
|
|
NioMultipartParser parser = Multipart.multipart(context).forNIO(listener); |
|
|
|
NioMultipartParser parser = Multipart |
|
|
|
|
|
|
|
.multipart(context) |
|
|
|
|
|
|
|
.usePartBodyStreamStorageFactory(streamStorageFactory) |
|
|
|
|
|
|
|
.forNIO(listener); |
|
|
|
|
|
|
|
|
|
|
|
this.inputMessage.getBody().subscribe(buffer -> { |
|
|
|
this.inputMessage.getBody().subscribe(buffer -> { |
|
|
|
byte[] resultBytes = new byte[buffer.readableByteCount()]; |
|
|
|
byte[] resultBytes = new byte[buffer.readableByteCount()]; |
|
|
|