|
|
@ -162,7 +162,6 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem |
|
|
|
(mediaType == null || MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType)); |
|
|
|
(mediaType == null || MediaType.MULTIPART_FORM_DATA.isCompatibleWith(mediaType)); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) { |
|
|
|
public Flux<Part> read(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) { |
|
|
|
return Flux.create(new SynchronossPartGenerator(message)) |
|
|
|
return Flux.create(new SynchronossPartGenerator(message)) |
|
|
@ -176,13 +175,9 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem |
|
|
|
}); |
|
|
|
}); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public Mono<Part> readMono( |
|
|
|
public Mono<Part> readMono(ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) { |
|
|
|
ResolvableType elementType, ReactiveHttpInputMessage message, Map<String, Object> hints) { |
|
|
|
return Mono.error(new UnsupportedOperationException("Cannot read multipart request body into single Part")); |
|
|
|
|
|
|
|
|
|
|
|
return Mono.error(new UnsupportedOperationException( |
|
|
|
|
|
|
|
"Cannot read multipart request body into single Part")); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -196,16 +191,16 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem |
|
|
|
|
|
|
|
|
|
|
|
private final LimitedPartBodyStreamStorageFactory storageFactory = new LimitedPartBodyStreamStorageFactory(); |
|
|
|
private final LimitedPartBodyStreamStorageFactory storageFactory = new LimitedPartBodyStreamStorageFactory(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Nullable |
|
|
|
private NioMultipartParserListener listener; |
|
|
|
private NioMultipartParserListener listener; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Nullable |
|
|
|
private NioMultipartParser parser; |
|
|
|
private NioMultipartParser parser; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public SynchronossPartGenerator(ReactiveHttpInputMessage inputMessage) { |
|
|
|
public SynchronossPartGenerator(ReactiveHttpInputMessage inputMessage) { |
|
|
|
this.inputMessage = inputMessage; |
|
|
|
this.inputMessage = inputMessage; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void accept(FluxSink<Part> sink) { |
|
|
|
public void accept(FluxSink<Part> sink) { |
|
|
|
HttpHeaders headers = this.inputMessage.getHeaders(); |
|
|
|
HttpHeaders headers = this.inputMessage.getHeaders(); |
|
|
@ -228,10 +223,13 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
protected void hookOnNext(DataBuffer buffer) { |
|
|
|
protected void hookOnNext(DataBuffer buffer) { |
|
|
|
|
|
|
|
Assert.state(this.parser != null && this.listener != null, "Not initialized yet"); |
|
|
|
|
|
|
|
|
|
|
|
int size = buffer.readableByteCount(); |
|
|
|
int size = buffer.readableByteCount(); |
|
|
|
this.storageFactory.increaseByteCount(size); |
|
|
|
this.storageFactory.increaseByteCount(size); |
|
|
|
byte[] resultBytes = new byte[size]; |
|
|
|
byte[] resultBytes = new byte[size]; |
|
|
|
buffer.read(resultBytes); |
|
|
|
buffer.read(resultBytes); |
|
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
try { |
|
|
|
this.parser.write(resultBytes); |
|
|
|
this.parser.write(resultBytes); |
|
|
|
} |
|
|
|
} |
|
|
@ -248,24 +246,32 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem |
|
|
|
@Override |
|
|
|
@Override |
|
|
|
protected void hookOnError(Throwable ex) { |
|
|
|
protected void hookOnError(Throwable ex) { |
|
|
|
try { |
|
|
|
try { |
|
|
|
this.parser.close(); |
|
|
|
if (this.parser != null) { |
|
|
|
|
|
|
|
this.parser.close(); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
catch (IOException ex2) { |
|
|
|
catch (IOException ex2) { |
|
|
|
// ignore
|
|
|
|
// ignore
|
|
|
|
} |
|
|
|
} |
|
|
|
finally { |
|
|
|
finally { |
|
|
|
int index = this.storageFactory.getCurrentPartIndex(); |
|
|
|
if (this.listener != null) { |
|
|
|
this.listener.onError("Failure while parsing part[" + index + "]", ex); |
|
|
|
int index = this.storageFactory.getCurrentPartIndex(); |
|
|
|
|
|
|
|
this.listener.onError("Failure while parsing part[" + index + "]", ex); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
protected void hookFinally(SignalType type) { |
|
|
|
protected void hookFinally(SignalType type) { |
|
|
|
try { |
|
|
|
try { |
|
|
|
this.parser.close(); |
|
|
|
if (this.parser != null) { |
|
|
|
|
|
|
|
this.parser.close(); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
catch (IOException ex) { |
|
|
|
catch (IOException ex) { |
|
|
|
this.listener.onError("Error while closing parser", ex); |
|
|
|
if (this.listener != null) { |
|
|
|
|
|
|
|
this.listener.onError("Error while closing parser", ex); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -279,9 +285,9 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem |
|
|
|
|
|
|
|
|
|
|
|
private class LimitedPartBodyStreamStorageFactory implements PartBodyStreamStorageFactory { |
|
|
|
private class LimitedPartBodyStreamStorageFactory implements PartBodyStreamStorageFactory { |
|
|
|
|
|
|
|
|
|
|
|
private final PartBodyStreamStorageFactory storageFactory = maxInMemorySize > 0 ? |
|
|
|
private final PartBodyStreamStorageFactory storageFactory = (maxInMemorySize > 0 ? |
|
|
|
new DefaultPartBodyStreamStorageFactory(maxInMemorySize) : |
|
|
|
new DefaultPartBodyStreamStorageFactory(maxInMemorySize) : |
|
|
|
new DefaultPartBodyStreamStorageFactory(); |
|
|
|
new DefaultPartBodyStreamStorageFactory()); |
|
|
|
|
|
|
|
|
|
|
|
private int index = 1; |
|
|
|
private int index = 1; |
|
|
|
|
|
|
|
|
|
|
@ -289,7 +295,6 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem |
|
|
|
|
|
|
|
|
|
|
|
private long partSize; |
|
|
|
private long partSize; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public int getCurrentPartIndex() { |
|
|
|
public int getCurrentPartIndex() { |
|
|
|
return this.index; |
|
|
|
return this.index; |
|
|
|
} |
|
|
|
} |
|
|
@ -338,7 +343,6 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem |
|
|
|
|
|
|
|
|
|
|
|
private final AtomicInteger terminated = new AtomicInteger(0); |
|
|
|
private final AtomicInteger terminated = new AtomicInteger(0); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
FluxSinkAdapterListener( |
|
|
|
FluxSinkAdapterListener( |
|
|
|
FluxSink<Part> sink, MultipartContext context, LimitedPartBodyStreamStorageFactory factory) { |
|
|
|
FluxSink<Part> sink, MultipartContext context, LimitedPartBodyStreamStorageFactory factory) { |
|
|
|
|
|
|
|
|
|
|
@ -347,7 +351,6 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem |
|
|
|
this.storageFactory = factory; |
|
|
|
this.storageFactory = factory; |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
@Override |
|
|
|
public void onPartFinished(StreamStorage storage, Map<String, List<String>> headers) { |
|
|
|
public void onPartFinished(StreamStorage storage, Map<String, List<String>> headers) { |
|
|
|
HttpHeaders httpHeaders = new HttpHeaders(); |
|
|
|
HttpHeaders httpHeaders = new HttpHeaders(); |
|
|
|