@ -32,10 +32,6 @@ import reactor.core.publisher.Mono;
@@ -32,10 +32,6 @@ import reactor.core.publisher.Mono;
import org.springframework.beans.BeanUtils ;
import org.springframework.core.ReactiveAdapterRegistry ;
import org.springframework.core.annotation.AnnotatedElementUtils ;
import org.springframework.core.codec.ByteArrayEncoder ;
import org.springframework.core.codec.ByteBufferEncoder ;
import org.springframework.core.codec.CharSequenceEncoder ;
import org.springframework.core.codec.DataBufferEncoder ;
import org.springframework.core.codec.Decoder ;
import org.springframework.core.codec.Encoder ;
import org.springframework.lang.Nullable ;
@ -47,7 +43,6 @@ import org.springframework.messaging.handler.annotation.MessageMapping;
@@ -47,7 +43,6 @@ import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.reactive.MessageMappingMessageHandler ;
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodReturnValueHandler ;
import org.springframework.messaging.rsocket.ClientRSocketFactoryConfigurer ;
import org.springframework.messaging.rsocket.DefaultMetadataExtractor ;
import org.springframework.messaging.rsocket.MetadataExtractor ;
import org.springframework.messaging.rsocket.RSocketRequester ;
import org.springframework.messaging.rsocket.RSocketStrategies ;
@ -74,12 +69,10 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
@@ -74,12 +69,10 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
private final List < Encoder < ? > > encoders = new ArrayList < > ( ) ;
@Nullable
private RSocketStrategies rsocketStrategies ;
@Nullable
private MetadataExtractor metadataExtractor ;
private RSocketStrategies strategies ;
@Nullable
private MimeType defaultDataMimeType ;
@ -87,37 +80,49 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
@@ -87,37 +80,49 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
public RSocketMessageHandler ( ) {
this . encoders . add ( CharSequenceEncoder . allMimeTypes ( ) ) ;
this . encoders . add ( new ByteBufferEncoder ( ) ) ;
this . encoders . add ( new ByteArrayEncoder ( ) ) ;
this . encoders . add ( new DataBufferEncoder ( ) ) ;
setRSocketStrategies ( RSocketStrategies . create ( ) ) ;
}
/ * *
* { @inheritDoc }
* < p > If { @link # setRSocketStrategies ( RSocketStrategies ) rsocketStrategies }
* is also set , this property is re - initialized with the decoders in it .
* Or vice versa , if { @link # setRSocketStrategies ( RSocketStrategies )
* rsocketStrategies } is not set , it will be initialized from this and
* other properties .
* < p > When { @link # setRSocketStrategies ( RSocketStrategies ) rsocketStrategies }
* is set , this property is re - initialized with the decoders in it , and
* vice versa , setting this property mutates the { @code RSocketStrategies }
* to change its decoders .
* < p > By default this is set to the
* { @link RSocketStrategies . Builder # decoder ( Decoder [ ] ) defaults } from
* { @code RSocketStrategies } .
* /
@Override
public void setDecoders ( List < ? extends Decoder < ? > > decoders ) {
super . setDecoders ( decoders ) ;
this . strategies = this . strategies . mutate ( )
. decoders ( list - > {
list . clear ( ) ;
list . addAll ( decoders ) ;
} )
. build ( ) ;
}
/ * *
* Configure the encoders to use for encoding handler method return values .
* < p > If { @link # setRSocketStrategies ( RSocketStrategies ) rsocketStrategies }
* is also set , this property is re - initialized with the encoders in it .
* Or vice versa , if { @link # setRSocketStrategies ( RSocketStrategies )
* rsocketStrategies } is not set , it will be initialized from this and
* other properties .
* < p > When { @link # setRSocketStrategies ( RSocketStrategies ) rsocketStrategies }
* is set , this property is re - initialized with the encoders in it , and
* vice versa , setting this property mutates the { @code RSocketStrategies }
* to change its encoders .
* < p > By default this is set to the
* { @link RSocketStrategies . Builder # encoder ( Encoder [ ] ) defaults } from
* { @code RSocketStrategies } .
* /
public void setEncoders ( List < ? extends Encoder < ? > > encoders ) {
this . encoders . clear ( ) ;
this . encoders . addAll ( encoders ) ;
this . strategies = this . strategies . mutate ( )
. encoders ( list - > {
list . clear ( ) ;
list . addAll ( encoders ) ;
} )
. build ( ) ;
}
/ * *
@ -128,59 +133,53 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
@@ -128,59 +133,53 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
}
/ * *
* Provide configuration in the form of { @link RSocketStrategies } instance
* which can also be re - used to initialize a client - side
* { @link RSocketRequester } .
* < p > When this is set , in turn it sets the following :
* < ul >
* < li > { @link # setDecoders ( List ) }
* < li > { @link # setEncoders ( List ) }
* < li > { @link # setRouteMatcher ( RouteMatcher ) }
* < li > { @link # setMetadataExtractor ( MetadataExtractor ) }
* < li > { @link # setReactiveAdapterRegistry ( ReactiveAdapterRegistry ) }
* < / ul >
* < p > By default if this is not set , it is initialized from the above .
* { @inheritDoc }
* < p > When { @link # setRSocketStrategies ( RSocketStrategies ) rsocketStrategies }
* is set , this property is re - initialized with the RouteMatcher in it , and
* vice versa , setting this property mutates the { @code RSocketStrategies }
* to change its route matcher .
* < p > By default this is set to the
* { @link RSocketStrategies . Builder # routeMatcher ( RouteMatcher ) defaults }
* from { @code RSocketStrategies } .
* /
public void setRSocketStrategies ( RSocketStrategies rsocketStrategies ) {
setDecoders ( rsocketStrategies . decoders ( ) ) ;
setEncoders ( rsocketStrategies . encoders ( ) ) ;
setRouteMatcher ( rsocketStrategies . routeMatcher ( ) ) ;
setMetadataExtractor ( rsocketStrategies . metadataExtractor ( ) ) ;
setReactiveAdapterRegistry ( rsocketStrategies . reactiveAdapterRegistry ( ) ) ;
@Override
public void setRouteMatcher ( RouteMatcher routeMatcher ) {
super . setRouteMatcher ( routeMatcher ) ;
this . strategies = this . strategies . mutate ( ) . routeMatcher ( routeMatcher ) . build ( ) ;
}
/ * *
* Return an { @link RSocketStrategies } instance initialized from the
* corresponding properties listed under { @link # setRSocketStrategies } .
* Configure the registry for adapting various reactive types .
* < p > When { @link # setRSocketStrategies ( RSocketStrategies ) rsocketStrategies }
* is set , this property is re - initialized with the
* { @code ReactiveAdapterRegistry } in it , and vice versa , setting this
* property mutates the { @code RSocketStrategies } to change its adapter
* registry .
* < p > By default this is set to the
* { @link RSocketStrategies . Builder # reactiveAdapterStrategy ( ReactiveAdapterRegistry ) defaults }
* from { @code RSocketStrategies } .
* /
public RSocketStrategies getRSocketStrategies ( ) {
return this . rsocketStrategies ! = null ? this . rsocketStrategies : initRSocketStrategies ( ) ;
}
private RSocketStrategies initRSocketStrategies ( ) {
return RSocketStrategies . builder ( )
. decoders ( List : : clear )
. encoders ( List : : clear )
. decoders ( decoders - > decoders . addAll ( getDecoders ( ) ) )
. encoders ( encoders - > encoders . addAll ( getEncoders ( ) ) )
. routeMatcher ( getRouteMatcher ( ) )
. metadataExtractor ( getMetadataExtractor ( ) )
. reactiveAdapterStrategy ( getReactiveAdapterRegistry ( ) )
. build ( ) ;
@Override
public void setReactiveAdapterRegistry ( ReactiveAdapterRegistry registry ) {
super . setReactiveAdapterRegistry ( registry ) ;
this . strategies = this . strategies . mutate ( ) . reactiveAdapterStrategy ( registry ) . build ( ) ;
}
/ * *
* Configure a { @link MetadataExtractor } to extract the route along with
* other metadata .
* < p > By default this is { @link DefaultMetadataExtractor } extracting a
* route from { @code "message/x.rsocket.routing.v0" } or { @code "text/plain" } .
* < p > If the extractor is a { @code DefaultMetadataExtractor } , its
* { @code decoders } property will be set , if not already set , to the
* { @link # setDecoders ( List ) } configured here .
* < p > When { @link # setRSocketStrategies ( RSocketStrategies ) rsocketStrategies }
* is set , this property is re - initialized with the { @code MetadataExtractor }
* in it , and vice versa , setting this property mutates the
* { @code RSocketStrategies } to change its { @code MetadataExtractor } .
* < p > By default this is set to the
* { @link RSocketStrategies . Builder # metadataExtractor ( MetadataExtractor ) } defaults }
* from { @code RSocketStrategies } .
* @param extractor the extractor to use
* /
public void setMetadataExtractor ( MetadataExtractor extractor ) {
this . metadataExtractor = extractor ;
this . strategies = this . strategies . mutate ( ) . metadataExtractor ( this . metadataExtractor ) . build ( ) ;
}
/ * *
@ -192,6 +191,40 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
@@ -192,6 +191,40 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
return this . metadataExtractor ;
}
/ * *
* Configure this handler through an { @link RSocketStrategies } instance which
* can be re - used to initialize a client - side { @link RSocketRequester } .
* < p > When this property is set , in turn it sets the following :
* < ul >
* < li > { @link # setDecoders ( List ) }
* < li > { @link # setEncoders ( List ) }
* < li > { @link # setRouteMatcher ( RouteMatcher ) }
* < li > { @link # setMetadataExtractor ( MetadataExtractor ) }
* < li > { @link # setReactiveAdapterRegistry ( ReactiveAdapterRegistry ) }
* < / ul >
* < p > By default this is set to { @link RSocketStrategies # create ( ) } which in
* turn sets default settings for all related properties .
* /
public void setRSocketStrategies ( RSocketStrategies rsocketStrategies ) {
this . strategies = rsocketStrategies ;
updateStateFromRSocketStrategies ( ) ;
}
private void updateStateFromRSocketStrategies ( ) {
setDecoders ( this . strategies . decoders ( ) ) ;
setEncoders ( this . strategies . encoders ( ) ) ;
setRouteMatcher ( this . strategies . routeMatcher ( ) ) ;
setMetadataExtractor ( this . strategies . metadataExtractor ( ) ) ;
setReactiveAdapterRegistry ( this . strategies . reactiveAdapterRegistry ( ) ) ;
}
/ * *
* Return the { @link # setRSocketStrategies configured } { @code RSocketStrategies } .
* /
public RSocketStrategies getRSocketStrategies ( ) {
return this . strategies ;
}
/ * *
* Configure the default content type to use for data payloads if the
* { @code SETUP } frame did not specify one .
@ -238,15 +271,6 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
@@ -238,15 +271,6 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
getArgumentResolverConfigurer ( ) . addCustomResolver ( new RSocketRequesterMethodArgumentResolver ( ) ) ;
super . afterPropertiesSet ( ) ;
if ( getMetadataExtractor ( ) = = null ) {
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor ( ) ;
extractor . setDecoders ( getDecoders ( ) ) ;
extractor . metadataToExtract ( MimeTypeUtils . TEXT_PLAIN , String . class , MetadataExtractor . ROUTE_KEY ) ;
setMetadataExtractor ( extractor ) ;
}
this . rsocketStrategies = initRSocketStrategies ( ) ;
}
@Override
@ -351,8 +375,7 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
@@ -351,8 +375,7 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
MimeType metaMimeType = StringUtils . hasText ( s ) ? MimeTypeUtils . parseMimeType ( s ) : this . defaultMetadataMimeType ;
Assert . notNull ( metaMimeType , "No `metadataMimeType` in ConnectionSetupPayload and no default value" ) ;
RSocketStrategies strategies = this . rsocketStrategies ;
Assert . notNull ( strategies , "No RSocketStrategies. Was afterPropertiesSet not called?" ) ;
RSocketStrategies strategies = getRSocketStrategies ( ) ;
RSocketRequester requester = RSocketRequester . wrap ( rsocket , dataMimeType , metaMimeType , strategies ) ;
Assert . state ( this . metadataExtractor ! = null ,