@ -1,5 +1,5 @@
@@ -1,5 +1,5 @@
/ *
* Copyright 2002 - 2018 the original author or authors .
* Copyright 2002 - 2019 the original author or authors .
*
* Licensed under the Apache License , Version 2 . 0 ( the "License" ) ;
* you may not use this file except in compliance with the License .
@ -47,9 +47,11 @@ import reactor.core.publisher.Mono;
@@ -47,9 +47,11 @@ import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType ;
import org.springframework.core.ResolvableType ;
import org.springframework.core.codec.DecodingException ;
import org.springframework.core.codec.Hints ;
import org.springframework.core.io.buffer.DataBuffer ;
import org.springframework.core.io.buffer.DataBufferFactory ;
import org.springframework.core.io.buffer.DataBufferLimitException ;
import org.springframework.core.io.buffer.DataBufferUtils ;
import org.springframework.core.io.buffer.DefaultDataBufferFactory ;
import org.springframework.core.log.LogFormatUtils ;
@ -78,73 +80,77 @@ import org.springframework.util.Assert;
@@ -78,73 +80,77 @@ import org.springframework.util.Assert;
* /
public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implements HttpMessageReader < Part > {
private final DataBufferFactory bufferFactory ;
// Static DataBufferFactory to copy from FileInputStream or wrap bytes[].
private static final DataBufferFactory bufferFactory = new DefaultDataBufferFactory ( ) ;
private final PartBodyStreamStorageFactory streamStorageFactory = new DefaultPartBodyStreamStorageFactory ( ) ;
private long maxPartCount = - 1 ;
private int maxInMemorySize = 256 * 1024 ;
private long maxFilePartSize = - 1 ;
private long maxDiskUsagePerPart = - 1 ;
private long maxPartSize = - 1 ;
private long maxParts = - 1 ;
public SynchronossPartHttpMessageReader ( ) {
this . bufferFactory = new DefaultDataBufferFactory ( ) ;
}
SynchronossPartHttpMessageReader ( DataBufferFactory bufferFactory ) {
this . bufferFactory = bufferFactory ;
}
/ * *
* Get the maximum number of parts allowed in a single multipart request .
* Configure the maximum amount of memory that is allowed to use per part .
* When the limit is exceeded :
* < ul >
* < li > file parts are written to a temporary file .
* < li > non - file parts are rejected with { @link DataBufferLimitException } .
* < / ul >
* < p > By default this is set to 256K .
* @param byteCount the in - memory limit in bytes ; if set to - 1 this limit is
* not enforced , and all parts may be written to disk and are limited only
* by the { @link # setMaxDiskUsagePerPart ( long ) maxDiskUsagePerPart } property .
* @since 5 . 1 . 11
* /
public long getMaxPartCount ( ) {
return maxPartCount ;
public void setMaxInMemorySize ( int byteCount ) {
this . maxInMemorySize = byte Count;
}
/ * *
* Configure the maximum number of parts allowed in a single multipart request .
* Get the { @link # setMaxInMemorySize configured } maximum in - memory size .
* @since 5 . 1 . 11
* /
public void setMaxPartCount ( long maxPartCount ) {
this . maxPartCount = maxPartCount ;
public int getMaxInMemorySize ( ) {
return this . maxInMemorySize ;
}
/ * *
* Get the maximum size of a file part .
* Configure the maximum amount of disk space allowed for file parts .
* < p > By default this is set to - 1 .
* @param maxDiskUsagePerPart the disk limit in bytes , or - 1 for unlimited
* @since 5 . 1 . 11
* /
public long getMaxFilePartSize ( ) {
return this . maxFilePartSize ;
public void setMaxDiskUsagePerPart ( long maxDiskUsagePerPart ) {
this . maxDiskUsagePerPart = maxDiskUsagePerPart ;
}
/ * *
* Configure the the maximum size of a file part .
* Get the { @link # setMaxDiskUsagePerPart configured } maximum disk usage .
* @since 5 . 1 . 11
* /
public void setMaxFilePartSize ( long maxFilePartSize ) {
this . maxFilePartSize = maxFilePartSize ;
public long getMaxDiskUsagePerPart ( ) {
return this . maxDiskUsagePerPart ;
}
/ * *
* Get the maximum size of a part .
* Specify the maximum number of p arts allowed in a given multi part reques t.
* @since 5 . 1 . 11
* /
public long getMaxPartSize ( ) {
return this . maxPartSize ;
public void setMaxParts ( long maxParts ) {
this . maxParts = maxParts ;
}
/ * *
* Configure the maximum size of a part .
* For limits on file parts , use the dedicated { @link # setMaxFilePartSize ( long ) } .
* Return the { @link # setMaxParts configured } limit on the number of parts .
* @since 5 . 1 . 11
* /
public void setMaxPartSize ( long maxPartSize ) {
this . maxPartSize = maxPartSize ;
public long getMaxParts ( ) {
return this . maxParts ;
}
@Override
public List < MediaType > getReadableMediaTypes ( ) {
return Collections . singletonList ( MediaType . MULTIPART_FORM_DATA ) ;
@ -159,8 +165,7 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -159,8 +165,7 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@Override
public Flux < Part > read ( ResolvableType elementType , ReactiveHttpInputMessage message , Map < String , Object > hints ) {
return Flux . create ( new SynchronossPartGenerator ( message , this . bufferFactory , this . streamStorageFactory ,
new MultipartSizeLimiter ( getMaxPartCount ( ) , getMaxFilePartSize ( ) , getMaxPartSize ( ) ) ) )
return Flux . create ( new SynchronossPartGenerator ( message ) )
. doOnNext ( part - > {
if ( ! Hints . isLoggingSuppressed ( hints ) ) {
LogFormatUtils . traceDebug ( logger , traceOn - > Hints . getLogPrefix ( hints ) + "Parsed " +
@ -173,89 +178,36 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -173,89 +178,36 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@Override
public Mono < Part > readMono ( ResolvableType elementType , ReactiveHttpInputMessage message , Map < String , Object > hints ) {
return Mono . error ( new UnsupportedOperationException ( "Cannot read multipart request body into single Part" ) ) ;
}
private static class MultipartSizeLimiter {
private final long maxPartCount ;
private final long maxFilePartSize ;
private final long maxPartSize ;
private boolean currentIsFilePart ;
private long currentPartCount ;
private long currentPartSize ;
public MultipartSizeLimiter ( long maxPartCount , long maxFilePartSize , long maxPartSize ) {
this . maxPartCount = maxPartCount ;
this . maxFilePartSize = maxFilePartSize ;
this . maxPartSize = maxPartSize ;
}
public void startPart ( boolean isFilePart ) {
this . currentPartCount + + ;
this . currentIsFilePart = isFilePart ;
if ( this . maxPartCount ! = - 1 & & this . currentPartCount > this . maxPartCount ) {
throw new IllegalStateException ( "Exceeded limit on maximum number of multipart parts" ) ;
}
}
public void endPart ( ) {
this . currentPartSize = 0L ;
this . currentIsFilePart = false ;
}
public void checkCurrentPartSize ( long addedBytes ) {
this . currentPartSize + = addedBytes ;
if ( this . currentIsFilePart & & this . maxFilePartSize ! = - 1 & & this . currentPartSize > this . maxFilePartSize ) {
throw new IllegalStateException ( "Exceeded limit on max size of multipart file : " + this . maxFilePartSize ) ;
}
else if ( ! this . currentIsFilePart & & this . maxPartSize ! = - 1 & & this . currentPartSize > this . maxPartSize ) {
throw new IllegalStateException ( "Exceeded limit on max size of multipart part : " + this . maxPartSize ) ;
}
}
public Mono < Part > readMono (
ResolvableType elementType , ReactiveHttpInputMessage message , Map < String , Object > hints ) {
return Mono . error ( new UnsupportedOperationException (
"Cannot read multipart request body into single Part" ) ) ;
}
/ * *
* Consume { @code DataBuffer } as a { @code BaseSubscriber } of the request body
* and feed it as input to the Synchronoss parser . Also listen for parser
* output events and adapt them to { @code Flux < Sink < Part > > } to emit parts
* for subscribers .
* Subscribe to the input stream and feed the Synchronoss parser . Then listen
* for parser output , creating parts , and pushing them into the FluxSink .
* /
private static class SynchronossPartGenerator extends BaseSubscriber < DataBuffer >
implements Consumer < FluxSink < Part > > {
private class SynchronossPartGenerator extends BaseSubscriber < DataBuffer > implements Consumer < FluxSink < Part > > {
private final ReactiveHttpInputMessage inputMessage ;
private final DataBufferFactory bufferFactory ;
private final PartBodyStreamStorageFactory streamStorageFactory ;
private final MultipartSizeLimiter limiter ;
private final LimitedPartBodyStreamStorageFactory storageFactory = new LimitedPartBodyStreamStorageFactory ( ) ;
private NioMultipartParserListener listener ;
private NioMultipartParser parser ;
public SynchronossPartGenerator ( ReactiveHttpInputMessage inputMessage , DataBufferFactory bufferFactory ,
PartBodyStreamStorageFactory streamStorageFactory , MultipartSizeLimiter limiter ) {
public SynchronossPartGenerator ( ReactiveHttpInputMessage inputMessage ) {
this . inputMessage = inputMessage ;
this . bufferFactory = bufferFactory ;
this . streamStorageFactory = new PartBodyStreamStorageFactoryDecorator ( streamStorageFactory , limiter ) ;
this . limiter = limiter ;
}
@Override
public void accept ( FluxSink < Part > emitter ) {
public void accept ( FluxSink < Part > sink ) {
HttpHeaders headers = this . inputMessage . getHeaders ( ) ;
MediaType mediaType = headers . getContentType ( ) ;
Assert . state ( mediaType ! = null , "No content type set" ) ;
@ -264,28 +216,29 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -264,28 +216,29 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
Charset charset = Optional . ofNullable ( mediaType . getCharset ( ) ) . orElse ( StandardCharsets . UTF_8 ) ;
MultipartContext context = new MultipartContext ( mediaType . toString ( ) , length , charset . name ( ) ) ;
this . listener = new FluxSinkAdapterListener ( emitter , this . bufferFactory , context , this . limiter ) ;
this . listener = new FluxSinkAdapterListener ( sink , context , this . storageFactory ) ;
this . parser = Multipart
. multipart ( context )
. usePartBodyStreamStorageFactory ( this . streamStorageFactory )
// long to int downcast vs. keeping the default 16Kb value
//.withHeadersSizeLimit(this.limiter.maxPartSize)
. usePartBodyStreamStorageFactory ( this . storageFactory )
. forNIO ( this . listener ) ;
this . inputMessage . getBody ( ) . subscribe ( this ) ;
}
@Override
protected void hookOnNext ( DataBuffer buffer ) {
int readableByteCount = buffer . readableByteCount ( ) ;
this . limiter . checkCurrentPartSize ( readableByteCount ) ;
byte [ ] resultBytes = new byte [ readableByteCount ] ;
int size = buffer . readableByteCount ( ) ;
this . storageFactory . increaseByteCount ( size ) ;
byte [ ] resultBytes = new byte [ size ] ;
buffer . read ( resultBytes ) ;
try {
parser . write ( resultBytes ) ;
this . parser . write ( resultBytes ) ;
}
catch ( IOException ex ) {
this . cancel ( ) ;
listener . onError ( "Exception thrown while providing input to the parser" , ex ) ;
cancel ( ) ;
int index = this . storageFactory . getCurrentPartIndex ( ) ;
this . listener . onError ( "Parser error for part [" + index + "]" , ex ) ;
}
finally {
DataBufferUtils . release ( buffer ) ;
@ -293,23 +246,26 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -293,23 +246,26 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
}
@Override
protected void hookOnError ( Throwable throwable ) {
this . cancel ( ) ;
listener . onError ( "Could not parse multipart request" , throwable ) ;
}
@Override
protected void hookOnCancel ( ) {
this . cancel ( ) ;
protected void hookOnError ( Throwable ex ) {
try {
this . parser . close ( ) ;
}
catch ( IOException ex2 ) {
// ignore
}
finally {
int index = this . storageFactory . getCurrentPartIndex ( ) ;
this . listener . onError ( "Failure while parsing part[" + index + "]" , ex ) ;
}
}
@Override
protected void hookFinally ( SignalType type ) {
try {
parser . close ( ) ;
this . parser . close ( ) ;
}
catch ( IOException ex ) {
listener . onError ( "Exception thrown while closing the parser" , ex ) ;
this . listener . onError ( "Error while closing parser" , ex ) ;
}
}
@ -320,25 +276,51 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -320,25 +276,51 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
}
}
private static class PartBodyStreamStorageFactoryDecorator implements PartBodyStreamStorageFactory {
private final PartBodyStreamStorageFactory streamStorageFactory ;
private class LimitedPartBodyStreamStorageFactory implements PartBodyStreamStorageFactory {
private final PartBodyStreamStorageFactory storageFactory = maxInMemorySize > 0 ?
new DefaultPartBodyStreamStorageFactory ( maxInMemorySize ) :
new DefaultPartBodyStreamStorageFactory ( ) ;
private int index = 1 ;
private boolean isFilePart ;
private final MultipartSizeLimiter limiter ;
private long partSize ;
public PartBodyStreamStorageFactoryDecorator ( PartBodyStreamStorageFactory streamStorageFactory ,
MultipartSizeLimiter limiter ) {
this . streamStorageFactory = streamStorageFactory ;
this . limiter = limiter ;
public int getCurrentPartIndex ( ) {
return this . index ;
}
@Override
public StreamStorage newStreamStorageForPartBody ( Map < String , List < String > > partHeaders , int partIndex ) {
HttpHeaders httpHeaders = new HttpHeaders ( ) ;
httpHeaders . putAll ( partHeaders ) ;
String filename = MultipartUtils . getFileName ( httpHeaders ) ;
this . limiter . startPart ( filename ! = null ) ;
return streamStorageFactory . newStreamStorageForPartBody ( partHeaders , partIndex ) ;
public StreamStorage newStreamStorageForPartBody ( Map < String , List < String > > headers , int index ) {
this . index = index ;
this . isFilePart = ( MultipartUtils . getFileName ( headers ) ! = null ) ;
this . partSize = 0 ;
if ( maxParts > 0 & & index > maxParts ) {
throw new DecodingException ( "Too many parts (" + index + " allowed)" ) ;
}
return this . storageFactory . newStreamStorageForPartBody ( headers , index ) ;
}
public void increaseByteCount ( long byteCount ) {
this . partSize + = byteCount ;
if ( maxInMemorySize > 0 & & ! this . isFilePart & & this . partSize > = maxInMemorySize ) {
throw new DataBufferLimitException ( "Part[" + this . index + "] " +
"exceeded the in-memory limit of " + maxInMemorySize + " bytes" ) ;
}
if ( maxDiskUsagePerPart > 0 & & this . isFilePart & & this . partSize > maxDiskUsagePerPart ) {
throw new DecodingException ( "Part[" + this . index + "] " +
"exceeded the disk usage limit of " + maxDiskUsagePerPart + " bytes" ) ;
}
}
public void partFinished ( ) {
this . index + + ;
this . isFilePart = false ;
this . partSize = 0 ;
}
}
@ -350,48 +332,48 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -350,48 +332,48 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
private final FluxSink < Part > sink ;
private final DataBufferFactory bufferFactory ;
private final MultipartContext context ;
private final MultipartSizeLimiter limiter ;
private final LimitedPartBodyStreamStorageFactory storageFactory ;
private final AtomicInteger terminated = new AtomicInteger ( 0 ) ;
FluxSinkAdapterListener ( FluxSink < Part > sink , DataBufferFactory factory ,
MultipartContext context , MultipartSizeLimiter limiter ) {
FluxSinkAdapterListener (
FluxSink < Part > sink , MultipartContext context , LimitedPartBodyStreamStorageFactory factory ) {
this . sink = sink ;
this . bufferFactory = factory ;
this . context = context ;
this . limiter = limiter ;
this . storageFactory = factory ;
}
@Override
public void onPartFinished ( StreamStorage storage , Map < String , List < String > > headers ) {
HttpHeaders httpHeaders = new HttpHeaders ( ) ;
httpHeaders . putAll ( headers ) ;
this . storageFactory . partFinished ( ) ;
this . sink . next ( createPart ( storage , httpHeaders ) ) ;
this . limiter . endPart ( ) ;
}
private Part createPart ( StreamStorage storage , HttpHeaders httpHeaders ) {
String filename = MultipartUtils . getFileName ( httpHeaders ) ;
if ( filename ! = null ) {
return new SynchronossFilePart ( httpHeaders , filename , storage , this . bufferFactory ) ;
return new SynchronossFilePart ( httpHeaders , filename , storage ) ;
}
else if ( MultipartUtils . isFormField ( httpHeaders , this . context ) ) {
String value = MultipartUtils . readFormParameterValue ( storage , httpHeaders ) ;
return new SynchronossFormFieldPart ( httpHeaders , this . bufferFactory , value ) ;
return new SynchronossFormFieldPart ( httpHeaders , value ) ;
}
else {
return new SynchronossPart ( httpHeaders , storage , this . bufferFactory ) ;
return new SynchronossPart ( httpHeaders , storage ) ;
}
}
@Override
public void onError ( String message , Throwable cause ) {
if ( this . terminated . getAndIncrement ( ) = = 0 ) {
this . sink . error ( new Multipart Exception( message , cause ) ) ;
this . sink . error ( new Decoding Exception( message , cause ) ) ;
}
}
@ -418,14 +400,10 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -418,14 +400,10 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
private final HttpHeaders headers ;
private final DataBufferFactory bufferFactory ;
AbstractSynchronossPart ( HttpHeaders headers , DataBufferFactory bufferFactory ) {
AbstractSynchronossPart ( HttpHeaders headers ) {
Assert . notNull ( headers , "HttpHeaders is required" ) ;
Assert . notNull ( bufferFactory , "DataBufferFactory is required" ) ;
this . name = MultipartUtils . getFieldName ( headers ) ;
this . headers = headers ;
this . bufferFactory = bufferFactory ;
}
@Override
@ -438,10 +416,6 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -438,10 +416,6 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
return this . headers ;
}
DataBufferFactory getBufferFactory ( ) {
return this . bufferFactory ;
}
@Override
public String toString ( ) {
return "Part '" + this . name + "', headers=" + this . headers ;
@ -453,15 +427,15 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -453,15 +427,15 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
private final StreamStorage storage ;
SynchronossPart ( HttpHeaders headers , StreamStorage storage , DataBufferFactory factory ) {
super ( headers , factory ) ;
SynchronossPart ( HttpHeaders headers , StreamStorage storage ) {
super ( headers ) ;
Assert . notNull ( storage , "StreamStorage is required" ) ;
this . storage = storage ;
}
@Override
public Flux < DataBuffer > content ( ) {
return DataBufferUtils . readInputStream ( getStorage ( ) : : getInputStream , getBufferFactory ( ) , 4096 ) ;
return DataBufferUtils . readInputStream ( getStorage ( ) : : getInputStream , bufferFactory , 4096 ) ;
}
protected StreamStorage getStorage ( ) {
@ -477,8 +451,8 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -477,8 +451,8 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
private final String filename ;
SynchronossFilePart ( HttpHeaders headers , String filename , StreamStorage storage , DataBufferFactory factory ) {
super ( headers , storage , factory ) ;
SynchronossFilePart ( HttpHeaders headers , String filename , StreamStorage storage ) {
super ( headers , storage ) ;
this . filename = filename ;
}
@ -537,8 +511,8 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -537,8 +511,8 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
private final String content ;
SynchronossFormFieldPart ( HttpHeaders headers , DataBufferFactory bufferFactory , String content ) {
super ( headers , bufferFactory ) ;
SynchronossFormFieldPart ( HttpHeaders headers , String content ) {
super ( headers ) ;
this . content = content ;
}
@ -550,9 +524,7 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@@ -550,9 +524,7 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
@Override
public Flux < DataBuffer > content ( ) {
byte [ ] bytes = this . content . getBytes ( getCharset ( ) ) ;
DataBuffer buffer = getBufferFactory ( ) . allocateBuffer ( bytes . length ) ;
buffer . write ( bytes ) ;
return Flux . just ( buffer ) ;
return Flux . just ( bufferFactory . wrap ( bytes ) ) ;
}
private Charset getCharset ( ) {