@ -20,7 +20,7 @@ import java.io.IOException;
@@ -20,7 +20,7 @@ import java.io.IOException;
import java.lang.reflect.Method ;
import java.util.List ;
import java.util.Map ;
import java.util.concurrent.ConcurrentHash Map ;
import java.util.concurrent.ConcurrentMap ;
import java.util.function.Function ;
import com.google.protobuf.CodedInputStream ;
@ -35,7 +35,9 @@ import org.springframework.core.codec.Decoder;
@@ -35,7 +35,9 @@ import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.DecodingException ;
import org.springframework.core.io.buffer.DataBuffer ;
import org.springframework.core.io.buffer.DataBufferUtils ;
import org.springframework.lang.Nullable ;
import org.springframework.util.Assert ;
import org.springframework.util.ConcurrentReferenceHashMap ;
import org.springframework.util.MimeType ;
/ * *
@ -66,7 +68,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@@ -66,7 +68,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
* /
protected static final int DEFAULT_MESSAGE_MAX_SIZE = 64 * 1024 ;
private static final ConcurrentHash Map < Class < ? > , Method > methodCache = new ConcurrentHashMap < > ( ) ;
private static final ConcurrentMap < Class < ? > , Method > methodCache = new ConcurrentReference HashMap < > ( ) ;
private final ExtensionRegistry extensionRegistry ;
@ -90,18 +92,20 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@@ -90,18 +92,20 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
this . extensionRegistry = extensionRegistry ;
}
public void setMaxMessageSize ( int maxMessageSize ) {
this . maxMessageSize = maxMessageSize ;
}
@Override
public boolean canDecode ( ResolvableType elementType , MimeType mimeType ) {
return Message . class . isAssignableFrom ( elementType . getRaw Class( ) ) & & supportsMimeType ( mimeType ) ;
public boolean canDecode ( ResolvableType elementType , @Nullable MimeType mimeType ) {
return Message . class . isAssignableFrom ( elementType . to Class( ) ) & & supportsMimeType ( mimeType ) ;
}
@Override
public Flux < Message > decode ( Publisher < DataBuffer > inputStream , ResolvableType elementType ,
MimeType mimeType , Map < String , Object > hints ) {
@Nullable MimeType mimeType , @Nullable Map < String , Object > hints ) {
return Flux . from ( inputStream )
. concatMap ( new MessageDecoderFunction ( elementType , this . maxMessageSize ) ) ;
@ -109,10 +113,11 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@@ -109,10 +113,11 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@Override
public Mono < Message > decodeToMono ( Publisher < DataBuffer > inputStream , ResolvableType elementType ,
MimeType mimeType , Map < String , Object > hints ) {
@Nullable MimeType mimeType , @Nullable Map < String , Object > hints ) {
return DataBufferUtils . join ( inputStream ) . map ( dataBuffer - > {
try {
Message . Builder builder = getMessageBuilder ( elementType . getRaw Class( ) ) ;
Message . Builder builder = getMessageBuilder ( elementType . to Class( ) ) ;
builder . mergeFrom ( CodedInputStream . newInstance ( dataBuffer . asByteBuffer ( ) ) , this . extensionRegistry ) ;
Message message = builder . build ( ) ;
DataBufferUtils . release ( dataBuffer ) ;
@ -153,6 +158,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@@ -153,6 +158,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
private final int maxMessageSize ;
@Nullable
private DataBuffer output ;
private int messageBytesToRead ;
@ -162,10 +168,10 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@@ -162,10 +168,10 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
this . maxMessageSize = maxMessageSize ;
}
// TODO Instead of the recursive call, loop over the current DataBuffer, produce a list of as many messages as are contained, and save any remaining bytes with flatMapIterable
// TODO Instead of the recursive call, loop over the current DataBuffer,
// produce a list of as many messages as are contained, and save any remaining bytes with flatMapIterable
@Override
public Publisher < ? extends Message > apply ( DataBuffer input ) {
try {
if ( this . output = = null ) {
int firstByte = input . read ( ) ;
@ -176,7 +182,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@@ -176,7 +182,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
if ( this . messageBytesToRead > this . maxMessageSize ) {
return Flux . error ( new DecodingException (
"The number of bytes to read parsed in the incoming stream (" +
this . messageBytesToRead + ") exceeds the configured limit (" + this . maxMessageSize + ")" ) ) ;
this . messageBytesToRead + ") exceeds the configured limit (" + this . maxMessageSize + ")" ) ) ;
}
this . output = input . factory ( ) . allocateBuffer ( this . messageBytesToRead ) ;
}
@ -187,7 +193,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@@ -187,7 +193,7 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
this . messageBytesToRead - = chunkBytesToRead ;
Message message = null ;
if ( this . messageBytesToRead = = 0 ) {
Message . Builder builder = getMessageBuilder ( this . elementType . getRaw Class( ) ) ;
Message . Builder builder = getMessageBuilder ( this . elementType . to Class( ) ) ;
builder . mergeFrom ( CodedInputStream . newInstance ( this . output . asByteBuffer ( ) ) , extensionRegistry ) ;
message = builder . build ( ) ;
DataBufferUtils . release ( this . output ) ;
@ -209,4 +215,5 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
@@ -209,4 +215,5 @@ public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Mes
}
}
}
}