Browse Source

Polish

pull/23382/head
Rossen Stoyanchev 5 years ago
parent
commit
c76370d7d8
  1. 10
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultMetadataExtractor.java
  2. 30
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java
  3. 16
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java
  4. 4
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java
  5. 22
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/MetadataExtractor.java
  6. 138
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java
  7. 9
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java
  8. 6
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/ConnectMapping.java
  9. 52
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java
  10. 55
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java
  11. 16
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java

10
spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultMetadataExtractor.java

@ -38,13 +38,11 @@ import org.springframework.lang.Nullable; @@ -38,13 +38,11 @@ import org.springframework.lang.Nullable;
import org.springframework.util.MimeType;
/**
* Default {@link MetadataExtractor} implementation that relies on {@link Decoder}s
* to deserialize the content of metadata entries.
*
* Default {@link MetadataExtractor} implementation that relies on
* {@link Decoder}s to deserialize the content of metadata entries.
* <p>By default only {@code "message/x.rsocket.routing.v0""} is extracted and
* saved under {@link MetadataExtractor#ROUTE_KEY}. Use the
* {@code metadataToExtract} methods to specify other metadata mime types of
* interest to extract.
* saved under {@link MetadataExtractor#ROUTE_KEY}. Use {@code metadataToExtract}
* methods to specify other metadata mime types of interest to extract.
*
* @author Rossen Stoyanchev
* @since 5.2

30
spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java

@ -49,18 +49,13 @@ import org.springframework.util.MimeType; @@ -49,18 +49,13 @@ import org.springframework.util.MimeType;
import org.springframework.util.ObjectUtils;
/**
* Default, package-private {@link RSocketRequester} implementation.
* Default implementation of {@link RSocketRequester}.
*
* @author Rossen Stoyanchev
* @since 5.2
*/
final class DefaultRSocketRequester implements RSocketRequester {
static final MimeType COMPOSITE_METADATA = new MimeType("message", "x.rsocket.composite-metadata.v0");
static final MimeType ROUTING = new MimeType("message", "x.rsocket.routing.v0");
/** For route variable replacement. */
private static final Pattern VARS_PATTERN = Pattern.compile("\\{([^/]+?)\\}");
@ -114,7 +109,7 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -114,7 +109,7 @@ final class DefaultRSocketRequester implements RSocketRequester {
public RequestSpec route(String route, Object... vars) {
Assert.notNull(route, "'route' is required");
route = expand(route, vars);
return new DefaultRequestSpec(route, metadataMimeType().equals(COMPOSITE_METADATA) ? ROUTING : null);
return new DefaultRequestSpec(route, isCompositeMetadata() ? MetadataExtractor.ROUTING : null);
}
private static String expand(String route, Object... vars) {
@ -134,6 +129,10 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -134,6 +129,10 @@ final class DefaultRSocketRequester implements RSocketRequester {
return sb.toString();
}
private boolean isCompositeMetadata() {
return metadataMimeType().equals(MetadataExtractor.COMPOSITE_METADATA);
}
@Override
public RequestSpec metadata(Object metadata, @Nullable MimeType mimeType) {
return new DefaultRequestSpec(metadata, mimeType);
@ -154,16 +153,12 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -154,16 +153,12 @@ final class DefaultRSocketRequester implements RSocketRequester {
private final Map<Object, MimeType> metadata = new LinkedHashMap<>(4);
public DefaultRequestSpec(Object metadata, @Nullable MimeType mimeType) {
DefaultRequestSpec(Object metadata, @Nullable MimeType mimeType) {
mimeType = (mimeType == null && !isCompositeMetadata() ? metadataMimeType() : mimeType);
Assert.notNull(mimeType, "MimeType is required for composite metadata");
metadata(metadata, mimeType);
}
private boolean isCompositeMetadata() {
return metadataMimeType().equals(COMPOSITE_METADATA);
}
@Override
public RequestSpec metadata(Object metadata, MimeType mimeType) {
Assert.notNull(metadata, "Metadata content is required");
@ -184,22 +179,27 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -184,22 +179,27 @@ final class DefaultRSocketRequester implements RSocketRequester {
public ResponseSpec data(Object producer, Class<?> elementClass) {
Assert.notNull(producer, "'producer' must not be null");
Assert.notNull(elementClass, "'elementClass' must not be null");
ReactiveAdapter adapter = strategies.reactiveAdapterRegistry().getAdapter(producer.getClass());
ReactiveAdapter adapter = getAdapter(producer.getClass());
Assert.notNull(adapter, "'producer' type is unknown to ReactiveAdapterRegistry");
return toResponseSpec(adapter.toPublisher(producer), ResolvableType.forClass(elementClass));
}
@Nullable
private ReactiveAdapter getAdapter(Class<?> aClass) {
return strategies.reactiveAdapterRegistry().getAdapter(aClass);
}
@Override
public ResponseSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef) {
Assert.notNull(producer, "'producer' must not be null");
Assert.notNull(elementTypeRef, "'elementTypeRef' must not be null");
ReactiveAdapter adapter = strategies.reactiveAdapterRegistry().getAdapter(producer.getClass());
ReactiveAdapter adapter = getAdapter(producer.getClass());
Assert.notNull(adapter, "'producer' type is unknown to ReactiveAdapterRegistry");
return toResponseSpec(adapter.toPublisher(producer), ResolvableType.forType(elementTypeRef));
}
private ResponseSpec toResponseSpec(Object input, ResolvableType elementType) {
ReactiveAdapter adapter = strategies.reactiveAdapterRegistry().getAdapter(input.getClass());
ReactiveAdapter adapter = getAdapter(input.getClass());
Publisher<?> publisher;
if (input instanceof Publisher) {
publisher = (Publisher<?>) input;

16
spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java

@ -48,7 +48,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { @@ -48,7 +48,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
@Nullable
private MimeType dataMimeType;
private MimeType metadataMimeType = DefaultRSocketRequester.COMPOSITE_METADATA;
private MimeType metadataMimeType = MetadataExtractor.COMPOSITE_METADATA;
@Nullable
private RSocketStrategies strategies;
@ -109,18 +109,18 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { @@ -109,18 +109,18 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders");
Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders");
RSocketFactory.ClientRSocketFactory rsocketFactory = RSocketFactory.connect();
MimeType dataMimeType = getDataMimeType(rsocketStrategies);
rsocketFactory.dataMimeType(dataMimeType.toString());
rsocketFactory.metadataMimeType(this.metadataMimeType.toString());
RSocketFactory.ClientRSocketFactory factory = RSocketFactory.connect();
this.rsocketConfigurers.forEach(configurer -> configurer.configure(factory));
if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) {
rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY);
factory.frameDecoder(PayloadDecoder.ZERO_COPY);
}
this.rsocketConfigurers.forEach(configurer -> configurer.configure(rsocketFactory));
MimeType dataMimeType = getDataMimeType(rsocketStrategies);
factory.dataMimeType(dataMimeType.toString());
factory.metadataMimeType(this.metadataMimeType.toString());
return rsocketFactory.transport(transport)
return factory.transport(transport)
.start()
.map(rsocket -> new DefaultRSocketRequester(
rsocket, dataMimeType, this.metadataMimeType, rsocketStrategies));

4
spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java

@ -43,7 +43,7 @@ import org.springframework.util.RouteMatcher; @@ -43,7 +43,7 @@ import org.springframework.util.RouteMatcher;
import org.springframework.util.SimpleRouteMatcher;
/**
* Default, package-private {@link RSocketStrategies} implementation.
* Default implementation of {@link RSocketStrategies}.
*
* @author Rossen Stoyanchev
* @since 5.2
@ -108,7 +108,7 @@ final class DefaultRSocketStrategies implements RSocketStrategies { @@ -108,7 +108,7 @@ final class DefaultRSocketStrategies implements RSocketStrategies {
/**
* Default RSocketStrategies.Builder implementation.
* Default implementation of {@link RSocketStrategies.Builder}.
*/
static class DefaultRSocketStrategiesBuilder implements RSocketStrategies.Builder {

22
spring-messaging/src/main/java/org/springframework/messaging/rsocket/MetadataExtractor.java

@ -22,10 +22,10 @@ import io.rsocket.Payload; @@ -22,10 +22,10 @@ import io.rsocket.Payload;
import org.springframework.util.MimeType;
/**
* Strategy to extract a map of values from the metadata of a {@link Payload}.
* This includes decoding metadata entries based on their mime type and
* assigning a name to the decoded value. The resulting name-value pairs can
* be added to the headers of a
* Strategy to extract a map of value(s) from {@link Payload} metadata, which
* could be composite metadata with multiple entries. Each metadata entry
* is decoded based on its {@code MimeType} and a name is assigned to the decoded
* value. The resulting name-value pairs can be added to the headers of a
* {@link org.springframework.messaging.Message Message}.
*
* @author Rossen Stoyanchev
@ -39,26 +39,22 @@ public interface MetadataExtractor { @@ -39,26 +39,22 @@ public interface MetadataExtractor {
String ROUTE_KEY = "route";
/**
* Constant for mime type {@code message/x.rsocket.composite-metadata.v0}.
* Constant MimeType {@code "message/x.rsocket.composite-metadata.v0"}.
*/
MimeType COMPOSITE_METADATA = new MimeType("message", "x.rsocket.composite-metadata.v0");
/**
* Constant for mime type {@code message/x.rsocket.routing.v0}.
* Constant for MimeType {@code "message/x.rsocket.routing.v0"}.
*/
MimeType ROUTING = new MimeType("message", "x.rsocket.routing.v0");
/**
* Extract a map of values from the given {@link Payload} metadata.
* <p>Metadata may be composite and consist of multiple entries
* Implementations are free to extract any number of name-value pairs per
* metadata entry. The Payload "route" should be saved under the
* {@link #ROUTE_KEY}.
* The Payload "route", if present, should be saved under {@link #ROUTE_KEY}.
* @param payload the payload whose metadata should be read
* @param metadataMimeType the mime type of the metadata; this is what was
* specified by the client at the start of the RSocket connection.
* @return a map of 0 or more decoded metadata values with assigned names
* @param metadataMimeType the metadata MimeType for the connection.
* @return name values pairs extracted from the metadata
*/
Map<String, Object> extract(Payload payload, MimeType metadataMimeType);

138
spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

@ -22,6 +22,8 @@ import java.util.function.Consumer; @@ -22,6 +22,8 @@ import java.util.function.Consumer;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@ -36,7 +38,7 @@ import org.springframework.util.MimeType; @@ -36,7 +38,7 @@ import org.springframework.util.MimeType;
/**
* A thin wrapper around a sending {@link RSocket} with a fluent API accepting
* and returning higher level Objects for input and for output, along with
* methods specify routing and other metadata.
* methods to prepare routing and other metadata.
*
* @author Rossen Stoyanchev
* @author Brian Clozel
@ -45,7 +47,7 @@ import org.springframework.util.MimeType; @@ -45,7 +47,7 @@ import org.springframework.util.MimeType;
public interface RSocketRequester {
/**
* Return the underlying RSocket used to make requests.
* Return the underlying sending RSocket.
*/
RSocket rsocket();
@ -69,47 +71,40 @@ public interface RSocketRequester { @@ -69,47 +71,40 @@ public interface RSocketRequester {
* Begin to specify a new request with the given route to a remote handler.
* <p>The route can be a template with placeholders, e.g.
* {@code "flight.{code}"} in which case the supplied route variables are
* expanded into the template after being formatted via {@code toString()}.
* formatted via {@code toString()} and expanded into the template.
* If a formatted variable contains a "." it is replaced with the escape
* sequence "%2E" to avoid treating it as separator by the responder .
* <p>If the connection is set to use composite metadata, the route is
* encoded as {@code "message/x.rsocket.routing.v0"}. Otherwise the route
* is encoded according to the mime type for the connection.
* @param route the route to a handler
* @param route the route expressing a remote handler mapping
* @param routeVars variables to be expanded into the route template
* @return a spec for further defining and executing the request
*/
RequestSpec route(String route, Object... routeVars);
/**
* Begin to specify a new request with the given metadata.
* <p>If using composite metadata then the mime type argument is required.
* Otherwise the mime type should be {@code null}, or it must match the
* mime type for the connection.
* Begin to specify a new request with the given metadata value.
* @param metadata the metadata value to encode
* @param mimeType the mime type that describes the metadata;
* This is required for connection using composite metadata. Otherwise the
* value is encoded according to the mime type for the connection and this
* argument may be left as {@code null}.
*/
RequestSpec metadata(Object metadata, @Nullable MimeType mimeType);
/**
* Obtain a builder for an {@link RSocketRequester} by connecting to an
* RSocket server. The builder allows for customization of
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory ClientRSocketFactory}
* settings, {@link RSocketStrategies}, and for selecting the transport to use.
* Obtain a builder to create a client {@link RSocketRequester} by connecting
* to an RSocket server.
*/
static RSocketRequester.Builder builder() {
return new DefaultRSocketRequesterBuilder();
}
/**
* Wrap an existing {@link RSocket}. This is typically used in a responder,
* client or server, to wrap the remote/sending {@code RSocket}.
* @param rsocket the RSocket to wrap
* @param dataMimeType the data MimeType from the {@code ConnectionSetupPayload}
* @param metadataMimeType the metadata MimeType from the {@code ConnectionSetupPayload}
* @param strategies the strategies to use
* @return the created RSocketRequester
* Wrap an existing {@link RSocket}. Typically used in client or server
* responders to wrap the {@code RSocket} for the remote side.
*/
static RSocketRequester wrap(
RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType,
@ -120,76 +115,76 @@ public interface RSocketRequester { @@ -120,76 +115,76 @@ public interface RSocketRequester {
/**
* Builder to prepare an {@link RSocketRequester} by connecting to an
* RSocket server and wrapping the resulting {@link RSocket}.
* Builder to create a requester by connecting to a server.
*/
interface Builder {
/**
* Configure the payload data MimeType to specify on the {@code SETUP}
* frame that applies to the whole connection.
* <p>If this is not set, it will be set to the MimeType of the first
* <p>If not set, this will be initialized to the MimeType of the first
* {@link RSocketStrategies.Builder#decoder(Decoder[]) non-default}
* {@code Decoder}, or otherwise fall back on the MimeType of the first
* (default) decoder.
* {@code Decoder}, or otherwise the MimeType of the first decoder.
*/
RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType);
/**
* Configure the payload metadata MimeType to specify on the {@code SETUP}
* frame and applies to the whole connection.
* frame that applies to the whole connection.
* <p>By default this is set to
* {@code "message/x.rsocket.composite-metadata.v0"} in which case the
* route, if provided, is encoded as a
* {@code "message/x.rsocket.routing.v0"} composite metadata entry.
* For any other MimeType, it is assumed to be the MimeType for the
* route, if provided.
* route, if provided, is encoded as a {@code "message/x.rsocket.routing.v0"}
* composite metadata entry. If this is set to any other MimeType, it is
* assumed that's the MimeType for the route, if provided.
*/
RSocketRequester.Builder metadataMimeType(MimeType mimeType);
/**
* Set the {@link RSocketStrategies} to use.
* <p>By default this is set to {@code RSocketStrategies.builder().build()}
* but may be further customized via {@link #rsocketStrategies(Consumer)}.
* Provide {@link RSocketStrategies} to use.
* <p>By default this is based on default settings of
* {@link RSocketStrategies.Builder} but may be further customized via
* {@link #rsocketStrategies(Consumer)}.
*/
RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies);
/**
* Customize the {@link RSocketStrategies}.
* <p>By default this starts out with an empty builder, i.e.
* {@link RSocketStrategies#builder()}, but the strategies can also be
* set via {@link #rsocketStrategies(RSocketStrategies)}.
* <p>By default this starts out as {@link RSocketStrategies#builder()}.
* However if strategies were {@link #rsocketStrategies(RSocketStrategies) set}
* explicitly, then they are {@link RSocketStrategies#mutate() mutated}.
*/
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
/**
* Callback to configure the {@code ClientRSocketFactory} directly.
* <p>Do not set {@link #dataMimeType(MimeType)} and
* {@link #metadataMimeType(MimeType)} directly on the
* {@code ClientRSocketFactory}. Use methods on this builder instead
* so the {@code RSocketRequester} will have access to them.
* <p>For configuring client side responding, see
* <p>Note that the data and metadata mime types cannot be set directly
* on the {@code ClientRSocketFactory}. Use shortcuts on this builder
* {@link #dataMimeType(MimeType)} and {@link #metadataMimeType(MimeType)}
* instead.
* <p>To configure client side responding, see
* {@link RSocketMessageHandler#clientResponder(RSocketStrategies, Object...)}.
*/
RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer);
/**
* Connect to the RSocket server over TCP.
* Connect to the server over TCP.
* @param host the server host
* @param port the server port
* @return an {@code RSocketRequester} for the connection
* @see TcpClientTransport
*/
Mono<RSocketRequester> connectTcp(String host, int port);
/**
* Connect to the RSocket server over WebSocket.
* Connect to the server over WebSocket.
* @param uri the RSocket server endpoint URI
* @return an {@code RSocketRequester} for the connection
* @see WebsocketClientTransport
*/
Mono<RSocketRequester> connectWebSocket(URI uri);
/**
* Connect to the RSocket server with the given {@code ClientTransport}.
* Connect to the server with the given {@code ClientTransport}.
* @param transport the client transport to use
* @return an {@code RSocketRequester} for the connection
*/
@ -199,15 +194,15 @@ public interface RSocketRequester { @@ -199,15 +194,15 @@ public interface RSocketRequester {
/**
* Contract to provide input data for an RSocket request.
* Spec for providing input data for an RSocket request.
*/
interface RequestSpec {
/**
* Use this to append additional metadata entries if the RSocket
* connection is configured to use composite metadata. If not, an
* {@link IllegalArgumentException} will be raised.
* @param metadata an Object, to be encoded with a suitable
* Use this to append additional metadata entries when using composite
* metadata. An {@link IllegalArgumentException} is raised if this
* method is used when not using composite metadata.
* @param metadata an Object to be encoded with a suitable
* {@link org.springframework.core.codec.Encoder Encoder}, or a
* {@link org.springframework.core.io.buffer.DataBuffer DataBuffer}
* @param mimeType the mime type that describes the metadata
@ -215,61 +210,60 @@ public interface RSocketRequester { @@ -215,61 +210,60 @@ public interface RSocketRequester {
RequestSpec metadata(Object metadata, MimeType mimeType);
/**
* Provide payload data. The data can be one of the following:
* Provide payload data for the request. This can be one of:
* <ul>
* <li>Concrete value
* <li>{@link Publisher} of value(s)
* <li>Any other producer of value(s) that can be adapted to a
* {@link Publisher} via {@link ReactiveAdapterRegistry}
* </ul>
* @param data the Object to use for payload data
* @return spec for declaring the expected response
* @param data the Object value for the payload data
* @return spec to declare the expected response
*/
ResponseSpec data(Object data);
/**
* Alternative of {@link #data(Object)} that accepts not only a producer
* of value(s) but also a hint for the types of values that will be
* produced. The class hint is used to find a compatible {@code Encoder}
* once, up front, and used for all values.
* Variant of {@link #data(Object)} that also accepts a hint for the
* types of values that will be produced. The class hint is used to
* find a compatible {@code Encoder} once, up front vs per value.
* @param producer the source of payload data value(s). This must be a
* {@link Publisher} or another producer adaptable to a
* {@code Publisher} via {@link ReactiveAdapterRegistry}
* @param elementClass the type of values to be produced
* @return spec for declaring the expected response
* @return spec to declare the expected response
*/
ResponseSpec data(Object producer, Class<?> elementClass);
/**
* Alternative of {@link #data(Object, Class)} but with a
* {@link ParameterizedTypeReference} hint which can provide generic
* type information.
* Variant of {@link #data(Object, Class)} for when the type hint has
* to have a generic type. See {@link ParameterizedTypeReference}.
* @param producer the source of payload data value(s). This must be a
* {@link Publisher} or another producer adaptable to a
* {@code Publisher} via {@link ReactiveAdapterRegistry}
* @param elementTypeRef the type of values to be produced
* @return spec to declare the expected response
*/
ResponseSpec data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
}
/**
* Contract to declare the expected RSocket response.
* Spect to declare the type of request and expected response.
*/
interface ResponseSpec {
/**
* Perform {@link RSocket#fireAndForget fireAndForget}.
* Perform a {@link RSocket#fireAndForget fireAndForget}.
*/
Mono<Void> send();
/**
* Perform {@link RSocket#requestResponse requestResponse}. If the
* expected data type is {@code Void.class}, the returned {@code Mono}
* will complete after all data is consumed.
* <p><strong>Note:</strong> Use of this method will raise an error if
* the request payload is a multi-valued {@link Publisher} as
* determined through the configured {@link ReactiveAdapterRegistry}.
* Perform a {@link RSocket#requestResponse requestResponse} exchange.
* <p>If the return type is {@code Mono<Void>}, the {@code Mono} will
* complete after all data is consumed.
* <p><strong>Note:</strong> This method will raise an error if
* the request payload is a multi-valued {@link Publisher} as there is
* no many-to-one RSocket interaction.
* @param dataType the expected data type for the response
* @param <T> parameter for the expected data type
* @return the decoded response
@ -283,11 +277,11 @@ public interface RSocketRequester { @@ -283,11 +277,11 @@ public interface RSocketRequester {
<T> Mono<T> retrieveMono(ParameterizedTypeReference<T> dataTypeRef);
/**
* Perform {@link RSocket#requestStream requestStream} or
* {@link RSocket#requestChannel requestChannel} depending on whether
* the request input consists of a single or multiple payloads.
* If the expected data type is {@code Void.class}, the returned
* {@code Flux} will complete after all data is consumed.
* Perform an {@link RSocket#requestStream requestStream} or a
* {@link RSocket#requestChannel requestChannel} exchange depending on
* whether the request input is single or multi-payload.
* <p>If the return type is {@code Flux<Void>}, the {@code Flux} will
* complete after all data is consumed.
* @param dataType the expected type for values in the response
* @param <T> parameterize the expected type of values
* @return the decoded response

9
spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java

@ -133,7 +133,7 @@ public interface RSocketStrategies { @@ -133,7 +133,7 @@ public interface RSocketStrategies {
}
/**
* Return a builder to build a new {@code RSocketStrategies} instance.
* Return a builder to prepare a new {@code RSocketStrategies} instance.
* The builder applies default settings, see individual builder methods for
* details.
*/
@ -179,7 +179,7 @@ public interface RSocketStrategies { @@ -179,7 +179,7 @@ public interface RSocketStrategies {
* client or server responders.
* <p>By default, {@link SimpleRouteMatcher} is used, backed by
* {@link AntPathMatcher} with "." as separator. For better
* efficiency consider using the {@code PathPatternRouteMatcher} from
* efficiency consider switching to {@code PathPatternRouteMatcher} from
* {@code spring-web} instead.
*/
Builder routeMatcher(@Nullable RouteMatcher routeMatcher);
@ -189,8 +189,7 @@ public interface RSocketStrategies { @@ -189,8 +189,7 @@ public interface RSocketStrategies {
* other metadata. This option is applicable to client or server
* responders.
* <p>By default this is {@link DefaultMetadataExtractor} extracting a
* route from {@code "message/x.rsocket.routing.v0"} or
* {@code "text/plain"} metadata entries.
* route from {@code "message/x.rsocket.routing.v0"} metadata.
*/
Builder metadataExtractor(@Nullable MetadataExtractor metadataExtractor);
@ -213,7 +212,7 @@ public interface RSocketStrategies { @@ -213,7 +212,7 @@ public interface RSocketStrategies {
* here, and sets the frame decoder in {@link ClientRSocketFactory
* ClientRSocketFactory} accordingly. For server setup, the
* {@link ServerRSocketFactory ServerRSocketFactory} must be configured
* accordingly too for zero copy.
* accordingly for zero copy too.
* <p>If using {@link DefaultDataBufferFactory} instead, there is no
* need for related config changes in RSocket.
*/

6
spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/ConnectMapping.java

@ -31,9 +31,9 @@ import io.rsocket.ConnectionSetupPayload; @@ -31,9 +31,9 @@ import io.rsocket.ConnectionSetupPayload;
* <p>This is a method-level annotation that can be combined with a type-level
* {@link org.springframework.messaging.handler.annotation.MessageMapping @MessageMapping}
* for a combined route pattern. It supports the same arguments as
* {@code @MessageMapping} but does not support any return values. On a server,
* handling can be asynchronous, e.g. with a {@code Mono<Void>} return value,
* in which case the connection is accepted if and when handling completes.
* {@code @MessageMapping} but the return value must be {@code void}. On a
* server, handling can be asynchronous (e.g. {@code Mono<Void>}), in which
* case the connection is accepted if and when the {@code Mono<Void>} completes.
* On the client side this method is only a callback and does not affect the
* establishment of the connection.
*

52
spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java

@ -53,13 +53,18 @@ import org.springframework.util.RouteMatcher; @@ -53,13 +53,18 @@ import org.springframework.util.RouteMatcher;
import org.springframework.util.StringUtils;
/**
* Extension of {@link MessageMappingMessageHandler} to use as an RSocket
* responder by handling incoming streams via {@code @MessageMapping} annotated
* methods.
* <p>Use {@link #clientResponder()} and {@link #serverResponder()} to obtain
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(Function) client} or
* {@link io.rsocket.RSocketFactory.ServerRSocketFactory#acceptor(SocketAcceptor) server}
* side adapters.
* Extension of {@link MessageMappingMessageHandler} for use in RSocket as a
* responder that handles requests with {@link ConnectMapping @ConnectMapping}
* and {@link MessageMapping @MessageMapping} methods.
* <p>For RSocket servers use {@link #serverResponder()} to obtain a
* {@link SocketAcceptor} to register with
* {@link io.rsocket.RSocketFactory.ServerRSocketFactory ServerRSocketFactory}.
* <p>For RSocket clients use {@link #clientResponder()} to obtain an adapter
* to register with
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory ClientRSocketFactory},
* or use the static shortcut
* {@link #clientResponder(RSocketStrategies, Object...)} to obtain a configurer
* for {@link RSocketRequester.Builder#rsocketFactory}.
*
* @author Rossen Stoyanchev
* @since 5.2
@ -86,8 +91,8 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { @@ -86,8 +91,8 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
* {@inheritDoc}
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the decoders in it, and
* vice versa, setting this property mutates the {@code RSocketStrategies}
* to change its decoders.
* likewise when this property is set the {@code RSocketStrategies} are
* mutated to change the decoders in them.
* <p>By default this is set to the
* {@link RSocketStrategies.Builder#decoder(Decoder[]) defaults} from
* {@code RSocketStrategies}.
@ -107,8 +112,8 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { @@ -107,8 +112,8 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
* Configure the encoders to use for encoding handler method return values.
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the encoders in it, and
* vice versa, setting this property mutates the {@code RSocketStrategies}
* to change its encoders.
* likewise when this property is set the {@code RSocketStrategies} are
* mutated to change the encoders in it.
* <p>By default this is set to the
* {@link RSocketStrategies.Builder#encoder(Encoder[]) defaults} from
* {@code RSocketStrategies}.
@ -134,9 +139,9 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { @@ -134,9 +139,9 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
/**
* {@inheritDoc}
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the RouteMatcher in it, and
* vice versa, setting this property mutates the {@code RSocketStrategies}
* to change its route matcher.
* is set, this property is re-initialized with the route matcher in it, and
* likewise when this property is set the {@code RSocketStrategies} are
* mutated to change the matcher in it.
* <p>By default this is set to the
* {@link RSocketStrategies.Builder#routeMatcher(RouteMatcher) defaults}
* from {@code RSocketStrategies}.
@ -150,10 +155,9 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { @@ -150,10 +155,9 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
/**
* Configure the registry for adapting various reactive types.
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the
* {@code ReactiveAdapterRegistry} in it, and vice versa, setting this
* property mutates the {@code RSocketStrategies} to change its adapter
* registry.
* is set, this property is re-initialized with the registry in it, and
* likewise when this property is set the {@code RSocketStrategies} are
* mutated to change the registry in it.
* <p>By default this is set to the
* {@link RSocketStrategies.Builder#reactiveAdapterStrategy(ReactiveAdapterRegistry) defaults}
* from {@code RSocketStrategies}.
@ -168,9 +172,9 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { @@ -168,9 +172,9 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
* Configure a {@link MetadataExtractor} to extract the route along with
* other metadata.
* <p>When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies}
* is set, this property is re-initialized with the {@code MetadataExtractor}
* in it, and vice versa, setting this property mutates the
* {@code RSocketStrategies} to change its {@code MetadataExtractor}.
* is set, this property is re-initialized with the extractor in it, and
* likewise when this property is set the {@code RSocketStrategies} are
* mutated to change the extractor in it.
* <p>By default this is set to the
* {@link RSocketStrategies.Builder#metadataExtractor(MetadataExtractor)} defaults}
* from {@code RSocketStrategies}.
@ -206,10 +210,6 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { @@ -206,10 +210,6 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
*/
public void setRSocketStrategies(RSocketStrategies rsocketStrategies) {
this.strategies = rsocketStrategies;
updateStateFromRSocketStrategies();
}
private void updateStateFromRSocketStrategies() {
setDecoders(this.strategies.decoders());
setEncoders(this.strategies.encoders());
setRouteMatcher(this.strategies.routeMatcher());
@ -326,7 +326,7 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { @@ -326,7 +326,7 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler {
* and return {@code Mono<Void>} with an error signal preventing the
* connection. Such a method can also start requests to the client but that
* must be done decoupled from handling and from the current thread.
* <p>Subsequent stream requests can be handled with
* <p>Subsequent requests on the connection can be handled with
* {@link MessageMapping MessageMapping} methods.
*/
public SocketAcceptor serverResponder() {

55
spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java

@ -24,6 +24,7 @@ import java.util.function.Consumer; @@ -24,6 +24,7 @@ import java.util.function.Consumer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.DuplexConnection;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
@ -62,19 +63,21 @@ public class DefaultRSocketRequesterBuilderTests { @@ -62,19 +63,21 @@ public class DefaultRSocketRequesterBuilderTests {
private ClientTransport transport;
private final MockConnection connection = new MockConnection();
private final TestRSocketFactoryConfigurer rsocketFactoryConfigurer = new TestRSocketFactoryConfigurer();
@Before
public void setup() {
this.transport = mock(ClientTransport.class);
given(this.transport.connect(anyInt())).willReturn(Mono.just(new MockConnection()));
given(this.transport.connect(anyInt())).willReturn(Mono.just(this.connection));
}
@Test
@SuppressWarnings("unchecked")
public void shouldApplyCustomizationsAtSubscription() {
public void rsocketFactoryConfigurerAppliesAtSubscription() {
Consumer<RSocketStrategies.Builder> strategiesConfigurer = mock(Consumer.class);
RSocketRequester.builder()
.rsocketFactory(this.rsocketFactoryConfigurer)
@ -87,7 +90,7 @@ public class DefaultRSocketRequesterBuilderTests { @@ -87,7 +90,7 @@ public class DefaultRSocketRequesterBuilderTests {
@Test
@SuppressWarnings("unchecked")
public void shouldApplyCustomizations() {
public void rsocketFactoryConfigurer() {
Consumer<RSocketStrategies.Builder> rsocketStrategiesConfigurer = mock(Consumer.class);
RSocketRequester.builder()
.rsocketFactory(this.rsocketFactoryConfigurer)
@ -109,12 +112,12 @@ public class DefaultRSocketRequesterBuilderTests { @@ -109,12 +112,12 @@ public class DefaultRSocketRequesterBuilderTests {
.block();
assertThat(requester.dataMimeType())
.as("Default data MimeType, based on the first configured Decoder")
.as("Default data MimeType, based on the first Decoder")
.isEqualTo(MimeTypeUtils.TEXT_PLAIN);
}
@Test
public void defaultDataMimeTypeWithCustomDecoderRegitered() {
public void defaultDataMimeTypeWithCustomDecoderRegistered() {
RSocketStrategies strategies = RSocketStrategies.builder()
.decoder(new TestJsonDecoder(MimeTypeUtils.APPLICATION_JSON))
.build();
@ -130,15 +133,45 @@ public class DefaultRSocketRequesterBuilderTests { @@ -130,15 +133,45 @@ public class DefaultRSocketRequesterBuilderTests {
}
@Test
public void dataMimeTypeSet() {
public void dataMimeTypeExplicitlySet() {
RSocketRequester requester = RSocketRequester.builder()
.dataMimeType(MimeTypeUtils.APPLICATION_JSON)
.connect(this.transport)
.block();
ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames())
.map(ConnectionSetupPayload::create)
.block();
assertThat(setupPayload.dataMimeType()).isEqualTo("application/json");
assertThat(requester.dataMimeType()).isEqualTo(MimeTypeUtils.APPLICATION_JSON);
}
@Test
public void mimeTypesCannotBeChangedAtRSocketFactoryLevel() {
MimeType dataMimeType = MimeTypeUtils.APPLICATION_JSON;
MimeType metaMimeType = MetadataExtractor.ROUTING;
RSocketRequester requester = RSocketRequester.builder()
.metadataMimeType(metaMimeType)
.dataMimeType(dataMimeType)
.rsocketFactory(factory -> {
factory.metadataMimeType("text/plain");
factory.dataMimeType("application/xml");
})
.connect(this.transport)
.block();
ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames())
.map(ConnectionSetupPayload::create)
.block();
assertThat(setupPayload.dataMimeType()).isEqualTo(dataMimeType.toString());
assertThat(setupPayload.metadataMimeType()).isEqualTo(metaMimeType.toString());
assertThat(requester.dataMimeType()).isEqualTo(dataMimeType);
assertThat(requester.metadataMimeType()).isEqualTo(metaMimeType);
}
@Test
public void frameDecoderMatchesDataBufferFactory() throws Exception {
testFrameDecoder(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT), PayloadDecoder.ZERO_COPY);
@ -170,8 +203,16 @@ public class DefaultRSocketRequesterBuilderTests { @@ -170,8 +203,16 @@ public class DefaultRSocketRequesterBuilderTests {
static class MockConnection implements DuplexConnection {
private Publisher<ByteBuf> sentFrames;
public Publisher<ByteBuf> sentFrames() {
return this.sentFrames;
}
@Override
public Mono<Void> send(Publisher<ByteBuf> frames) {
this.sentFrames = frames;
return Mono.empty();
}
@ -224,7 +265,7 @@ public class DefaultRSocketRequesterBuilderTests { @@ -224,7 +265,7 @@ public class DefaultRSocketRequesterBuilderTests {
@Override
public boolean canDecode(ResolvableType elementType, MimeType mimeType) {
return false;
throw new UnsupportedOperationException();
}
@Override

16
spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java

@ -48,8 +48,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -48,8 +48,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.springframework.messaging.rsocket.DefaultRSocketRequester.COMPOSITE_METADATA;
import static org.springframework.messaging.rsocket.DefaultRSocketRequester.ROUTING;
import static org.springframework.util.MimeTypeUtils.TEXT_PLAIN;
/**
@ -147,7 +145,7 @@ public class DefaultRSocketRequesterTests { @@ -147,7 +145,7 @@ public class DefaultRSocketRequesterTests {
public void metadataCompositeWithRoute() {
RSocketRequester requester = RSocketRequester.wrap(
this.rsocket, TEXT_PLAIN, COMPOSITE_METADATA, this.strategies);
this.rsocket, TEXT_PLAIN, MetadataExtractor.COMPOSITE_METADATA, this.strategies);
requester.route("toA").data("bodyA").send().block(Duration.ofSeconds(5));
@ -156,7 +154,7 @@ public class DefaultRSocketRequesterTests { @@ -156,7 +154,7 @@ public class DefaultRSocketRequesterTests {
assertThat(iterator.hasNext()).isTrue();
CompositeMetadata.Entry entry = iterator.next();
assertThat(entry.getMimeType()).isEqualTo(ROUTING.toString());
assertThat(entry.getMimeType()).isEqualTo(MetadataExtractor.ROUTING.toString());
assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA");
assertThat(iterator.hasNext()).isFalse();
@ -166,7 +164,7 @@ public class DefaultRSocketRequesterTests { @@ -166,7 +164,7 @@ public class DefaultRSocketRequesterTests {
public void metadataCompositeWithRouteAndTextEntry() {
RSocketRequester requester = RSocketRequester.wrap(
this.rsocket, TEXT_PLAIN, COMPOSITE_METADATA, this.strategies);
this.rsocket, TEXT_PLAIN, MetadataExtractor.COMPOSITE_METADATA, this.strategies);
requester.route("toA")
.metadata("My metadata", TEXT_PLAIN).data("bodyA")
@ -178,7 +176,7 @@ public class DefaultRSocketRequesterTests { @@ -178,7 +176,7 @@ public class DefaultRSocketRequesterTests {
assertThat(iterator.hasNext()).isTrue();
CompositeMetadata.Entry entry = iterator.next();
assertThat(entry.getMimeType()).isEqualTo(ROUTING.toString());
assertThat(entry.getMimeType()).isEqualTo(MetadataExtractor.ROUTING.toString());
assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA");
assertThat(iterator.hasNext()).isTrue();
@ -206,7 +204,7 @@ public class DefaultRSocketRequesterTests { @@ -206,7 +204,7 @@ public class DefaultRSocketRequesterTests {
@Test
public void metadataMimeTypeMismatch() {
RSocketRequester requester = RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, TEXT_PLAIN, this.strategies);
assertThatThrownBy(() -> requester.metadata("toA", ROUTING).data("bodyA").send().block())
assertThatThrownBy(() -> requester.metadata("toA", MetadataExtractor.ROUTING).data("bodyA").send().block())
.hasMessageStartingWith("Connection configured for metadata mime type");
}
@ -219,8 +217,8 @@ public class DefaultRSocketRequesterTests { @@ -219,8 +217,8 @@ public class DefaultRSocketRequesterTests {
@Test
public void supportedMetadataMimeTypes() {
RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, COMPOSITE_METADATA, this.strategies);
RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, ROUTING, this.strategies);
RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, MetadataExtractor.COMPOSITE_METADATA, this.strategies);
RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, MetadataExtractor.ROUTING, this.strategies);
}
@Test

Loading…
Cancel
Save