@ -15,22 +15,26 @@
@@ -15,22 +15,26 @@
* /
package org.springframework.messaging.rsocket ;
import java.nio.charset.StandardCharsets ;
import java.util.ArrayList ;
import java.util.Collections ;
import java.util.HashMap ;
import java.util.List ;
import java.util.Map ;
import java.util.function.BiConsumer ;
import io.netty.buffer.ByteBuf ;
import io.netty.buffer.PooledByteBufAllocator ;
import io.rsocket.Payload ;
import io.rsocket.metadata.CompositeMetadata ;
import org.springframework.core.ParameterizedTypeReference ;
import org.springframework.core.ResolvableType ;
import org.springframework.core.codec.Decoder ;
import org.springframework.core.io.buffer.DataBuffer ;
import org.springframework.core.io.buffer.DataBufferFactory ;
import org.springframework.core.io.buffer.NettyDataBuffer ;
import org.springframework.core.io.buffer.NettyDataBufferFactory ;
import org.springframework.lang.Nullable ;
import org.springframework.util.Assert ;
import org.springframework.util.MimeType ;
/ * *
@ -47,15 +51,53 @@ import org.springframework.util.MimeType;
@@ -47,15 +51,53 @@ import org.springframework.util.MimeType;
* /
public class DefaultMetadataExtractor implements MetadataExtractor {
private final Map < String , EntryProcessor < ? > > entryProcessors = new HashMap < > ( ) ;
private final List < Decoder < ? > > decoders = new ArrayList < > ( ) ;
private final Map < String , MetadataProcessor < ? > > processors = new HashMap < > ( ) ;
/ * *
* Default constructor with { @link RSocketStrategies } .
* Configure the decoders to use for de - serializing metadata entries .
* < p > By default this is not set .
* < p > When this extractor is passed into { @link RSocketStrategies . Builder } or
* { @link org . springframework . messaging . rsocket . annotation . support . RSocketMessageHandler
* RSocketMessageHandler } , the decoders may be left not set , and they will
* be initialized from the decoders already configured there .
* /
public DefaultMetadataExtractor ( ) {
// TODO: remove when rsocket-core API available
metadataToExtract ( MetadataExtractor . ROUTING , String . class , ROUTE_KEY ) ;
public void setDecoders ( List < ? extends Decoder < ? > > decoders ) {
this . decoders . clear ( ) ;
if ( ! decoders . isEmpty ( ) ) {
this . decoders . addAll ( decoders ) ;
updateProcessors ( ) ;
}
}
@SuppressWarnings ( "unchecked" )
private < T > void updateProcessors ( ) {
for ( MetadataProcessor < ? > info : this . processors . values ( ) ) {
Decoder < T > decoder = decoderFor ( info . mimeType ( ) , info . targetType ( ) ) ;
Assert . isTrue ( decoder ! = null , "No decoder for " + info ) ;
info = ( ( MetadataProcessor < T > ) info ) . setDecoder ( decoder ) ;
this . processors . put ( info . mimeType ( ) . toString ( ) , info ) ;
}
}
@Nullable
@SuppressWarnings ( "unchecked" )
private < T > Decoder < T > decoderFor ( MimeType mimeType , ResolvableType type ) {
for ( Decoder < ? > decoder : this . decoders ) {
if ( decoder . canDecode ( type , mimeType ) ) {
return ( Decoder < T > ) decoder ;
}
}
return null ;
}
/ * *
* Return the { @link # setDecoders ( List ) configured } decoders .
* /
public List < ? extends Decoder < ? > > getDecoders ( ) {
return this . decoders ;
}
@ -97,11 +139,9 @@ public class DefaultMetadataExtractor implements MetadataExtractor {
@@ -97,11 +139,9 @@ public class DefaultMetadataExtractor implements MetadataExtractor {
* @param < T > the target value type
* /
public < T > void metadataToExtract (
MimeType mimeType , Class < T > targetType ,
BiConsumer < T , Map < String , Object > > mapper ) {
MimeType mimeType , Class < T > targetType , BiConsumer < T , Map < String , Object > > mapper ) {
EntryProcessor < T > spec = new EntryProcessor < > ( mimeType , targetType , mapper ) ;
this . entryProcessors . put ( mimeType . toString ( ) , spec ) ;
metadataToExtract ( mimeType , mapper , ResolvableType . forClass ( targetType ) ) ;
}
/ * *
@ -117,45 +157,52 @@ public class DefaultMetadataExtractor implements MetadataExtractor {
@@ -117,45 +157,52 @@ public class DefaultMetadataExtractor implements MetadataExtractor {
MimeType mimeType , ParameterizedTypeReference < T > targetType ,
BiConsumer < T , Map < String , Object > > mapper ) {
EntryProcessor < T > spec = new EntryProcessor < > ( mimeType , targetType , mapper ) ;
this . entryProcessors . put ( mimeType . toString ( ) , spec ) ;
metadataToExtract ( mimeType , mapper , ResolvableType . forType ( targetType ) ) ;
}
private < T > void metadataToExtract (
MimeType mimeType , BiConsumer < T , Map < String , Object > > mapper , ResolvableType elementType ) {
Decoder < T > decoder = decoderFor ( mimeType , elementType ) ;
Assert . isTrue ( this . decoders . isEmpty ( ) | | decoder ! = null , ( ) - > "No decoder for " + mimeType ) ;
MetadataProcessor < T > info = new MetadataProcessor < > ( mimeType , elementType , mapper , decoder ) ;
this . processors . put ( mimeType . toString ( ) , info ) ;
}
@Override
public Map < String , Object > extract ( Payload payload , MimeType metadataMimeType , RSocketStrategies strategies ) {
public Map < String , Object > extract ( Payload payload , MimeType metadataMimeType ) {
Map < String , Object > result = new HashMap < > ( ) ;
if ( metadataMimeType . equals ( COMPOSITE_METADATA ) ) {
for ( CompositeMetadata . Entry entry : new CompositeMetadata ( payload . metadata ( ) , false ) ) {
processEntry ( entry . getContent ( ) , entry . getMimeType ( ) , result , strategies ) ;
processEntry ( entry . getContent ( ) , entry . getMimeType ( ) , result ) ;
}
}
else {
processEntry ( payload . metadata ( ) , metadataMimeType . toString ( ) , result , strategies ) ;
processEntry ( payload . metadata ( ) , metadataMimeType . toString ( ) , result ) ;
}
return result ;
}
private void processEntry ( ByteBuf content ,
@Nullable String mimeType , Map < String , Object > result , RSocketStrategies strategies ) {
EntryProcessor < ? > entryProcessor = this . entryProcessors . get ( mimeType ) ;
if ( entryProcessor ! = null ) {
content . retain ( ) ;
entryProcessor . process ( content , result , strategies ) ;
@SuppressWarnings ( "unchecked" )
private < T > void processEntry ( ByteBuf content , @Nullable String mimeType , Map < String , Object > result ) {
MetadataProcessor < T > info = ( MetadataProcessor < T > ) this . processors . get ( mimeType ) ;
if ( info ! = null ) {
info . process ( content , result ) ;
return ;
}
if ( MetadataExtractor . ROUTING . toString ( ) . equals ( mimeType ) ) {
// TODO: use rsocket-core API when available
result . put ( MetadataExtractor . ROUTE_KEY , content . toString ( StandardCharsets . UTF_8 ) ) ;
}
}
/ * *
* Helps to decode a metadata entry and add the resulting value to the
* output map .
* /
private class EntryProcessor < T > {
private static class MetadataProcessor < T > {
private final static NettyDataBufferFactory bufferFactory =
new NettyDataBufferFactory ( PooledByteBufAllocator . DEFAULT ) ;
private final MimeType mimeType ;
@ -163,41 +210,54 @@ public class DefaultMetadataExtractor implements MetadataExtractor {
@@ -163,41 +210,54 @@ public class DefaultMetadataExtractor implements MetadataExtractor {
private final BiConsumer < T , Map < String , Object > > accumulator ;
@Nullable
private final Decoder < T > decoder ;
public EntryProcessor (
MimeType mimeType , Class < T > targetType ,
BiConsumer < T , Map < String , Object > > accumulator ) {
this ( mimeType , ResolvableType . forClass ( targetType ) , accumulator ) ;
}
MetadataProcessor ( MimeType mimeType , ResolvableType targetType ,
BiConsumer < T , Map < String , Object > > accumulator , @Nullable Decoder < T > decoder ) {
public EntryProcessor (
MimeType mimeType , ParameterizedTypeReference < T > targetType ,
BiConsumer < T , Map < String , Object > > accumulator ) {
this . mimeType = mimeType ;
this . targetType = targetType ;
this . accumulator = accumulator ;
this . decoder = decoder ;
}
this ( mimeType , ResolvableType . forType ( targetType ) , accumulator ) ;
MetadataProcessor ( MetadataProcessor < T > other , Decoder < T > decoder ) {
this . mimeType = other . mimeType ;
this . targetType = other . targetType ;
this . accumulator = other . accumulator ;
this . decoder = decoder ;
}
private EntryProcessor (
MimeType mimeType , ResolvableType targetType ,
BiConsumer < T , Map < String , Object > > accumulator ) {
this . mimeType = mimeType ;
this . targetType = targetType ;
this . accumulator = accumulator ;
public MimeType mimeType ( ) {
return this . mimeType ;
}
public ResolvableType targetType ( ) {
return this . targetType ;
}
public MetadataProcessor < T > setDecoder ( Decoder < T > decoder ) {
return this . decoder ! = decoder ? new MetadataProcessor < > ( this , decoder ) : this ;
}
public void process ( ByteBuf byteBuf , Map < String , Object > result , RSocketStrategies strategies ) {
DataBufferFactory factory = strategies . dataBufferFactory ( ) ;
DataBuffer buffer = factory instanceof NettyDataBufferFactory ?
( ( NettyDataBufferFactory ) factory ) . wrap ( byteBuf ) :
factory . wrap ( byteBuf . nioBuffer ( ) ) ;
Decoder < T > decoder = strategies . decoder ( this . targetType , this . mimeType ) ;
T value = decoder . decode ( buffer , this . targetType , this . mimeType , Collections . emptyMap ( ) ) ;
public void process ( ByteBuf content , Map < String , Object > result ) {
if ( this . decoder = = null ) {
throw new IllegalStateException ( "No decoder for " + this ) ;
}
NettyDataBuffer dataBuffer = bufferFactory . wrap ( content . retain ( ) ) ;
T value = this . decoder . decode ( dataBuffer , this . targetType , this . mimeType , Collections . emptyMap ( ) ) ;
this . accumulator . accept ( value , result ) ;
}
@Override
public String toString ( ) {
return "MetadataProcessor mimeType=" + this . mimeType + ", targetType=" + this . targetType ;
}
}
}