diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultMetadataExtractor.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultMetadataExtractor.java index 6e320bd5d0..90b76a3787 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultMetadataExtractor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultMetadataExtractor.java @@ -38,13 +38,11 @@ import org.springframework.lang.Nullable; import org.springframework.util.MimeType; /** - * Default {@link MetadataExtractor} implementation that relies on {@link Decoder}s - * to deserialize the content of metadata entries. - * + * Default {@link MetadataExtractor} implementation that relies on + * {@link Decoder}s to deserialize the content of metadata entries. *

By default only {@code "message/x.rsocket.routing.v0""} is extracted and - * saved under {@link MetadataExtractor#ROUTE_KEY}. Use the - * {@code metadataToExtract} methods to specify other metadata mime types of - * interest to extract. + * saved under {@link MetadataExtractor#ROUTE_KEY}. Use {@code metadataToExtract} + * methods to specify other metadata mime types of interest to extract. * * @author Rossen Stoyanchev * @since 5.2 diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java index 8fac1b1593..8ce78b4cf3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java @@ -49,18 +49,13 @@ import org.springframework.util.MimeType; import org.springframework.util.ObjectUtils; /** - * Default, package-private {@link RSocketRequester} implementation. + * Default implementation of {@link RSocketRequester}. * * @author Rossen Stoyanchev * @since 5.2 */ final class DefaultRSocketRequester implements RSocketRequester { - static final MimeType COMPOSITE_METADATA = new MimeType("message", "x.rsocket.composite-metadata.v0"); - - static final MimeType ROUTING = new MimeType("message", "x.rsocket.routing.v0"); - - /** For route variable replacement. */ private static final Pattern VARS_PATTERN = Pattern.compile("\\{([^/]+?)\\}"); @@ -114,7 +109,7 @@ final class DefaultRSocketRequester implements RSocketRequester { public RequestSpec route(String route, Object... vars) { Assert.notNull(route, "'route' is required"); route = expand(route, vars); - return new DefaultRequestSpec(route, metadataMimeType().equals(COMPOSITE_METADATA) ? ROUTING : null); + return new DefaultRequestSpec(route, isCompositeMetadata() ? MetadataExtractor.ROUTING : null); } private static String expand(String route, Object... vars) { @@ -134,6 +129,10 @@ final class DefaultRSocketRequester implements RSocketRequester { return sb.toString(); } + private boolean isCompositeMetadata() { + return metadataMimeType().equals(MetadataExtractor.COMPOSITE_METADATA); + } + @Override public RequestSpec metadata(Object metadata, @Nullable MimeType mimeType) { return new DefaultRequestSpec(metadata, mimeType); @@ -154,16 +153,12 @@ final class DefaultRSocketRequester implements RSocketRequester { private final Map metadata = new LinkedHashMap<>(4); - public DefaultRequestSpec(Object metadata, @Nullable MimeType mimeType) { + DefaultRequestSpec(Object metadata, @Nullable MimeType mimeType) { mimeType = (mimeType == null && !isCompositeMetadata() ? metadataMimeType() : mimeType); Assert.notNull(mimeType, "MimeType is required for composite metadata"); metadata(metadata, mimeType); } - private boolean isCompositeMetadata() { - return metadataMimeType().equals(COMPOSITE_METADATA); - } - @Override public RequestSpec metadata(Object metadata, MimeType mimeType) { Assert.notNull(metadata, "Metadata content is required"); @@ -184,22 +179,27 @@ final class DefaultRSocketRequester implements RSocketRequester { public ResponseSpec data(Object producer, Class elementClass) { Assert.notNull(producer, "'producer' must not be null"); Assert.notNull(elementClass, "'elementClass' must not be null"); - ReactiveAdapter adapter = strategies.reactiveAdapterRegistry().getAdapter(producer.getClass()); + ReactiveAdapter adapter = getAdapter(producer.getClass()); Assert.notNull(adapter, "'producer' type is unknown to ReactiveAdapterRegistry"); return toResponseSpec(adapter.toPublisher(producer), ResolvableType.forClass(elementClass)); } + @Nullable + private ReactiveAdapter getAdapter(Class aClass) { + return strategies.reactiveAdapterRegistry().getAdapter(aClass); + } + @Override public ResponseSpec data(Object producer, ParameterizedTypeReference elementTypeRef) { Assert.notNull(producer, "'producer' must not be null"); Assert.notNull(elementTypeRef, "'elementTypeRef' must not be null"); - ReactiveAdapter adapter = strategies.reactiveAdapterRegistry().getAdapter(producer.getClass()); + ReactiveAdapter adapter = getAdapter(producer.getClass()); Assert.notNull(adapter, "'producer' type is unknown to ReactiveAdapterRegistry"); return toResponseSpec(adapter.toPublisher(producer), ResolvableType.forType(elementTypeRef)); } private ResponseSpec toResponseSpec(Object input, ResolvableType elementType) { - ReactiveAdapter adapter = strategies.reactiveAdapterRegistry().getAdapter(input.getClass()); + ReactiveAdapter adapter = getAdapter(input.getClass()); Publisher publisher; if (input instanceof Publisher) { publisher = (Publisher) input; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java index c5316ebc3e..10c05cf0ef 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java @@ -48,7 +48,7 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { @Nullable private MimeType dataMimeType; - private MimeType metadataMimeType = DefaultRSocketRequester.COMPOSITE_METADATA; + private MimeType metadataMimeType = MetadataExtractor.COMPOSITE_METADATA; @Nullable private RSocketStrategies strategies; @@ -109,18 +109,18 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { Assert.isTrue(!rsocketStrategies.encoders().isEmpty(), "No encoders"); Assert.isTrue(!rsocketStrategies.decoders().isEmpty(), "No decoders"); - RSocketFactory.ClientRSocketFactory rsocketFactory = RSocketFactory.connect(); - MimeType dataMimeType = getDataMimeType(rsocketStrategies); - rsocketFactory.dataMimeType(dataMimeType.toString()); - rsocketFactory.metadataMimeType(this.metadataMimeType.toString()); + RSocketFactory.ClientRSocketFactory factory = RSocketFactory.connect(); + this.rsocketConfigurers.forEach(configurer -> configurer.configure(factory)); if (rsocketStrategies.dataBufferFactory() instanceof NettyDataBufferFactory) { - rsocketFactory.frameDecoder(PayloadDecoder.ZERO_COPY); + factory.frameDecoder(PayloadDecoder.ZERO_COPY); } - this.rsocketConfigurers.forEach(configurer -> configurer.configure(rsocketFactory)); + MimeType dataMimeType = getDataMimeType(rsocketStrategies); + factory.dataMimeType(dataMimeType.toString()); + factory.metadataMimeType(this.metadataMimeType.toString()); - return rsocketFactory.transport(transport) + return factory.transport(transport) .start() .map(rsocket -> new DefaultRSocketRequester( rsocket, dataMimeType, this.metadataMimeType, rsocketStrategies)); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java index 21435e0544..204daffd85 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketStrategies.java @@ -43,7 +43,7 @@ import org.springframework.util.RouteMatcher; import org.springframework.util.SimpleRouteMatcher; /** - * Default, package-private {@link RSocketStrategies} implementation. + * Default implementation of {@link RSocketStrategies}. * * @author Rossen Stoyanchev * @since 5.2 @@ -108,7 +108,7 @@ final class DefaultRSocketStrategies implements RSocketStrategies { /** - * Default RSocketStrategies.Builder implementation. + * Default implementation of {@link RSocketStrategies.Builder}. */ static class DefaultRSocketStrategiesBuilder implements RSocketStrategies.Builder { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MetadataExtractor.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MetadataExtractor.java index 580b702da5..ca617c088e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MetadataExtractor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MetadataExtractor.java @@ -22,10 +22,10 @@ import io.rsocket.Payload; import org.springframework.util.MimeType; /** - * Strategy to extract a map of values from the metadata of a {@link Payload}. - * This includes decoding metadata entries based on their mime type and - * assigning a name to the decoded value. The resulting name-value pairs can - * be added to the headers of a + * Strategy to extract a map of value(s) from {@link Payload} metadata, which + * could be composite metadata with multiple entries. Each metadata entry + * is decoded based on its {@code MimeType} and a name is assigned to the decoded + * value. The resulting name-value pairs can be added to the headers of a * {@link org.springframework.messaging.Message Message}. * * @author Rossen Stoyanchev @@ -39,26 +39,22 @@ public interface MetadataExtractor { String ROUTE_KEY = "route"; /** - * Constant for mime type {@code message/x.rsocket.composite-metadata.v0}. + * Constant MimeType {@code "message/x.rsocket.composite-metadata.v0"}. */ MimeType COMPOSITE_METADATA = new MimeType("message", "x.rsocket.composite-metadata.v0"); /** - * Constant for mime type {@code message/x.rsocket.routing.v0}. + * Constant for MimeType {@code "message/x.rsocket.routing.v0"}. */ MimeType ROUTING = new MimeType("message", "x.rsocket.routing.v0"); /** * Extract a map of values from the given {@link Payload} metadata. - *

Metadata may be composite and consist of multiple entries - * Implementations are free to extract any number of name-value pairs per - * metadata entry. The Payload "route" should be saved under the - * {@link #ROUTE_KEY}. + * The Payload "route", if present, should be saved under {@link #ROUTE_KEY}. * @param payload the payload whose metadata should be read - * @param metadataMimeType the mime type of the metadata; this is what was - * specified by the client at the start of the RSocket connection. - * @return a map of 0 or more decoded metadata values with assigned names + * @param metadataMimeType the metadata MimeType for the connection. + * @return name values pairs extracted from the metadata */ Map extract(Payload payload, MimeType metadataMimeType); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index 569f67e429..5a8d123494 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -22,6 +22,8 @@ import java.util.function.Consumer; import io.rsocket.ConnectionSetupPayload; import io.rsocket.RSocket; import io.rsocket.transport.ClientTransport; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.client.WebsocketClientTransport; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -36,7 +38,7 @@ import org.springframework.util.MimeType; /** * A thin wrapper around a sending {@link RSocket} with a fluent API accepting * and returning higher level Objects for input and for output, along with - * methods specify routing and other metadata. + * methods to prepare routing and other metadata. * * @author Rossen Stoyanchev * @author Brian Clozel @@ -45,7 +47,7 @@ import org.springframework.util.MimeType; public interface RSocketRequester { /** - * Return the underlying RSocket used to make requests. + * Return the underlying sending RSocket. */ RSocket rsocket(); @@ -69,47 +71,40 @@ public interface RSocketRequester { * Begin to specify a new request with the given route to a remote handler. *

The route can be a template with placeholders, e.g. * {@code "flight.{code}"} in which case the supplied route variables are - * expanded into the template after being formatted via {@code toString()}. + * formatted via {@code toString()} and expanded into the template. * If a formatted variable contains a "." it is replaced with the escape * sequence "%2E" to avoid treating it as separator by the responder . *

If the connection is set to use composite metadata, the route is * encoded as {@code "message/x.rsocket.routing.v0"}. Otherwise the route * is encoded according to the mime type for the connection. - * @param route the route to a handler + * @param route the route expressing a remote handler mapping * @param routeVars variables to be expanded into the route template * @return a spec for further defining and executing the request */ RequestSpec route(String route, Object... routeVars); /** - * Begin to specify a new request with the given metadata. - *

If using composite metadata then the mime type argument is required. - * Otherwise the mime type should be {@code null}, or it must match the - * mime type for the connection. + * Begin to specify a new request with the given metadata value. * @param metadata the metadata value to encode * @param mimeType the mime type that describes the metadata; + * This is required for connection using composite metadata. Otherwise the + * value is encoded according to the mime type for the connection and this + * argument may be left as {@code null}. */ RequestSpec metadata(Object metadata, @Nullable MimeType mimeType); /** - * Obtain a builder for an {@link RSocketRequester} by connecting to an - * RSocket server. The builder allows for customization of - * {@link io.rsocket.RSocketFactory.ClientRSocketFactory ClientRSocketFactory} - * settings, {@link RSocketStrategies}, and for selecting the transport to use. + * Obtain a builder to create a client {@link RSocketRequester} by connecting + * to an RSocket server. */ static RSocketRequester.Builder builder() { return new DefaultRSocketRequesterBuilder(); } /** - * Wrap an existing {@link RSocket}. This is typically used in a responder, - * client or server, to wrap the remote/sending {@code RSocket}. - * @param rsocket the RSocket to wrap - * @param dataMimeType the data MimeType from the {@code ConnectionSetupPayload} - * @param metadataMimeType the metadata MimeType from the {@code ConnectionSetupPayload} - * @param strategies the strategies to use - * @return the created RSocketRequester + * Wrap an existing {@link RSocket}. Typically used in client or server + * responders to wrap the {@code RSocket} for the remote side. */ static RSocketRequester wrap( RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType, @@ -120,76 +115,76 @@ public interface RSocketRequester { /** - * Builder to prepare an {@link RSocketRequester} by connecting to an - * RSocket server and wrapping the resulting {@link RSocket}. + * Builder to create a requester by connecting to a server. */ interface Builder { /** * Configure the payload data MimeType to specify on the {@code SETUP} * frame that applies to the whole connection. - *

If this is not set, it will be set to the MimeType of the first + *

If not set, this will be initialized to the MimeType of the first * {@link RSocketStrategies.Builder#decoder(Decoder[]) non-default} - * {@code Decoder}, or otherwise fall back on the MimeType of the first - * (default) decoder. + * {@code Decoder}, or otherwise the MimeType of the first decoder. */ RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType); /** * Configure the payload metadata MimeType to specify on the {@code SETUP} - * frame and applies to the whole connection. + * frame that applies to the whole connection. *

By default this is set to * {@code "message/x.rsocket.composite-metadata.v0"} in which case the - * route, if provided, is encoded as a - * {@code "message/x.rsocket.routing.v0"} composite metadata entry. - * For any other MimeType, it is assumed to be the MimeType for the - * route, if provided. + * route, if provided, is encoded as a {@code "message/x.rsocket.routing.v0"} + * composite metadata entry. If this is set to any other MimeType, it is + * assumed that's the MimeType for the route, if provided. */ RSocketRequester.Builder metadataMimeType(MimeType mimeType); /** - * Set the {@link RSocketStrategies} to use. - *

By default this is set to {@code RSocketStrategies.builder().build()} - * but may be further customized via {@link #rsocketStrategies(Consumer)}. + * Provide {@link RSocketStrategies} to use. + *

By default this is based on default settings of + * {@link RSocketStrategies.Builder} but may be further customized via + * {@link #rsocketStrategies(Consumer)}. */ RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies); /** * Customize the {@link RSocketStrategies}. - *

By default this starts out with an empty builder, i.e. - * {@link RSocketStrategies#builder()}, but the strategies can also be - * set via {@link #rsocketStrategies(RSocketStrategies)}. + *

By default this starts out as {@link RSocketStrategies#builder()}. + * However if strategies were {@link #rsocketStrategies(RSocketStrategies) set} + * explicitly, then they are {@link RSocketStrategies#mutate() mutated}. */ RSocketRequester.Builder rsocketStrategies(Consumer configurer); /** * Callback to configure the {@code ClientRSocketFactory} directly. - *

Do not set {@link #dataMimeType(MimeType)} and - * {@link #metadataMimeType(MimeType)} directly on the - * {@code ClientRSocketFactory}. Use methods on this builder instead - * so the {@code RSocketRequester} will have access to them. - *

For configuring client side responding, see + *

Note that the data and metadata mime types cannot be set directly + * on the {@code ClientRSocketFactory}. Use shortcuts on this builder + * {@link #dataMimeType(MimeType)} and {@link #metadataMimeType(MimeType)} + * instead. + *

To configure client side responding, see * {@link RSocketMessageHandler#clientResponder(RSocketStrategies, Object...)}. */ RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer); /** - * Connect to the RSocket server over TCP. + * Connect to the server over TCP. * @param host the server host * @param port the server port * @return an {@code RSocketRequester} for the connection + * @see TcpClientTransport */ Mono connectTcp(String host, int port); /** - * Connect to the RSocket server over WebSocket. + * Connect to the server over WebSocket. * @param uri the RSocket server endpoint URI * @return an {@code RSocketRequester} for the connection + * @see WebsocketClientTransport */ Mono connectWebSocket(URI uri); /** - * Connect to the RSocket server with the given {@code ClientTransport}. + * Connect to the server with the given {@code ClientTransport}. * @param transport the client transport to use * @return an {@code RSocketRequester} for the connection */ @@ -199,15 +194,15 @@ public interface RSocketRequester { /** - * Contract to provide input data for an RSocket request. + * Spec for providing input data for an RSocket request. */ interface RequestSpec { /** - * Use this to append additional metadata entries if the RSocket - * connection is configured to use composite metadata. If not, an - * {@link IllegalArgumentException} will be raised. - * @param metadata an Object, to be encoded with a suitable + * Use this to append additional metadata entries when using composite + * metadata. An {@link IllegalArgumentException} is raised if this + * method is used when not using composite metadata. + * @param metadata an Object to be encoded with a suitable * {@link org.springframework.core.codec.Encoder Encoder}, or a * {@link org.springframework.core.io.buffer.DataBuffer DataBuffer} * @param mimeType the mime type that describes the metadata @@ -215,61 +210,60 @@ public interface RSocketRequester { RequestSpec metadata(Object metadata, MimeType mimeType); /** - * Provide payload data. The data can be one of the following: + * Provide payload data for the request. This can be one of: *

- * @param data the Object to use for payload data - * @return spec for declaring the expected response + * @param data the Object value for the payload data + * @return spec to declare the expected response */ ResponseSpec data(Object data); /** - * Alternative of {@link #data(Object)} that accepts not only a producer - * of value(s) but also a hint for the types of values that will be - * produced. The class hint is used to find a compatible {@code Encoder} - * once, up front, and used for all values. + * Variant of {@link #data(Object)} that also accepts a hint for the + * types of values that will be produced. The class hint is used to + * find a compatible {@code Encoder} once, up front vs per value. * @param producer the source of payload data value(s). This must be a * {@link Publisher} or another producer adaptable to a * {@code Publisher} via {@link ReactiveAdapterRegistry} * @param elementClass the type of values to be produced - * @return spec for declaring the expected response + * @return spec to declare the expected response */ ResponseSpec data(Object producer, Class elementClass); /** - * Alternative of {@link #data(Object, Class)} but with a - * {@link ParameterizedTypeReference} hint which can provide generic - * type information. + * Variant of {@link #data(Object, Class)} for when the type hint has + * to have a generic type. See {@link ParameterizedTypeReference}. * @param producer the source of payload data value(s). This must be a * {@link Publisher} or another producer adaptable to a * {@code Publisher} via {@link ReactiveAdapterRegistry} * @param elementTypeRef the type of values to be produced + * @return spec to declare the expected response */ ResponseSpec data(Object producer, ParameterizedTypeReference elementTypeRef); } /** - * Contract to declare the expected RSocket response. + * Spect to declare the type of request and expected response. */ interface ResponseSpec { /** - * Perform {@link RSocket#fireAndForget fireAndForget}. + * Perform a {@link RSocket#fireAndForget fireAndForget}. */ Mono send(); /** - * Perform {@link RSocket#requestResponse requestResponse}. If the - * expected data type is {@code Void.class}, the returned {@code Mono} - * will complete after all data is consumed. - *

Note: Use of this method will raise an error if - * the request payload is a multi-valued {@link Publisher} as - * determined through the configured {@link ReactiveAdapterRegistry}. + * Perform a {@link RSocket#requestResponse requestResponse} exchange. + *

If the return type is {@code Mono}, the {@code Mono} will + * complete after all data is consumed. + *

Note: This method will raise an error if + * the request payload is a multi-valued {@link Publisher} as there is + * no many-to-one RSocket interaction. * @param dataType the expected data type for the response * @param parameter for the expected data type * @return the decoded response @@ -283,11 +277,11 @@ public interface RSocketRequester { Mono retrieveMono(ParameterizedTypeReference dataTypeRef); /** - * Perform {@link RSocket#requestStream requestStream} or - * {@link RSocket#requestChannel requestChannel} depending on whether - * the request input consists of a single or multiple payloads. - * If the expected data type is {@code Void.class}, the returned - * {@code Flux} will complete after all data is consumed. + * Perform an {@link RSocket#requestStream requestStream} or a + * {@link RSocket#requestChannel requestChannel} exchange depending on + * whether the request input is single or multi-payload. + *

If the return type is {@code Flux}, the {@code Flux} will + * complete after all data is consumed. * @param dataType the expected type for values in the response * @param parameterize the expected type of values * @return the decoded response diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java index 4c4aaea3b8..0449cf4c80 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java @@ -133,7 +133,7 @@ public interface RSocketStrategies { } /** - * Return a builder to build a new {@code RSocketStrategies} instance. + * Return a builder to prepare a new {@code RSocketStrategies} instance. * The builder applies default settings, see individual builder methods for * details. */ @@ -179,7 +179,7 @@ public interface RSocketStrategies { * client or server responders. *

By default, {@link SimpleRouteMatcher} is used, backed by * {@link AntPathMatcher} with "." as separator. For better - * efficiency consider using the {@code PathPatternRouteMatcher} from + * efficiency consider switching to {@code PathPatternRouteMatcher} from * {@code spring-web} instead. */ Builder routeMatcher(@Nullable RouteMatcher routeMatcher); @@ -189,8 +189,7 @@ public interface RSocketStrategies { * other metadata. This option is applicable to client or server * responders. *

By default this is {@link DefaultMetadataExtractor} extracting a - * route from {@code "message/x.rsocket.routing.v0"} or - * {@code "text/plain"} metadata entries. + * route from {@code "message/x.rsocket.routing.v0"} metadata. */ Builder metadataExtractor(@Nullable MetadataExtractor metadataExtractor); @@ -213,7 +212,7 @@ public interface RSocketStrategies { * here, and sets the frame decoder in {@link ClientRSocketFactory * ClientRSocketFactory} accordingly. For server setup, the * {@link ServerRSocketFactory ServerRSocketFactory} must be configured - * accordingly too for zero copy. + * accordingly for zero copy too. *

If using {@link DefaultDataBufferFactory} instead, there is no * need for related config changes in RSocket. */ diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/ConnectMapping.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/ConnectMapping.java index 7188bc6b69..2c7eb7ab9f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/ConnectMapping.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/ConnectMapping.java @@ -31,9 +31,9 @@ import io.rsocket.ConnectionSetupPayload; *

This is a method-level annotation that can be combined with a type-level * {@link org.springframework.messaging.handler.annotation.MessageMapping @MessageMapping} * for a combined route pattern. It supports the same arguments as - * {@code @MessageMapping} but does not support any return values. On a server, - * handling can be asynchronous, e.g. with a {@code Mono} return value, - * in which case the connection is accepted if and when handling completes. + * {@code @MessageMapping} but the return value must be {@code void}. On a + * server, handling can be asynchronous (e.g. {@code Mono}), in which + * case the connection is accepted if and when the {@code Mono} completes. * On the client side this method is only a callback and does not affect the * establishment of the connection. * diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java index 6784614983..7376f33d0d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/annotation/support/RSocketMessageHandler.java @@ -53,13 +53,18 @@ import org.springframework.util.RouteMatcher; import org.springframework.util.StringUtils; /** - * Extension of {@link MessageMappingMessageHandler} to use as an RSocket - * responder by handling incoming streams via {@code @MessageMapping} annotated - * methods. - *

Use {@link #clientResponder()} and {@link #serverResponder()} to obtain - * {@link io.rsocket.RSocketFactory.ClientRSocketFactory#acceptor(Function) client} or - * {@link io.rsocket.RSocketFactory.ServerRSocketFactory#acceptor(SocketAcceptor) server} - * side adapters. + * Extension of {@link MessageMappingMessageHandler} for use in RSocket as a + * responder that handles requests with {@link ConnectMapping @ConnectMapping} + * and {@link MessageMapping @MessageMapping} methods. + *

For RSocket servers use {@link #serverResponder()} to obtain a + * {@link SocketAcceptor} to register with + * {@link io.rsocket.RSocketFactory.ServerRSocketFactory ServerRSocketFactory}. + *

For RSocket clients use {@link #clientResponder()} to obtain an adapter + * to register with + * {@link io.rsocket.RSocketFactory.ClientRSocketFactory ClientRSocketFactory}, + * or use the static shortcut + * {@link #clientResponder(RSocketStrategies, Object...)} to obtain a configurer + * for {@link RSocketRequester.Builder#rsocketFactory}. * * @author Rossen Stoyanchev * @since 5.2 @@ -86,8 +91,8 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { * {@inheritDoc} *

When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies} * is set, this property is re-initialized with the decoders in it, and - * vice versa, setting this property mutates the {@code RSocketStrategies} - * to change its decoders. + * likewise when this property is set the {@code RSocketStrategies} are + * mutated to change the decoders in them. *

By default this is set to the * {@link RSocketStrategies.Builder#decoder(Decoder[]) defaults} from * {@code RSocketStrategies}. @@ -107,8 +112,8 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { * Configure the encoders to use for encoding handler method return values. *

When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies} * is set, this property is re-initialized with the encoders in it, and - * vice versa, setting this property mutates the {@code RSocketStrategies} - * to change its encoders. + * likewise when this property is set the {@code RSocketStrategies} are + * mutated to change the encoders in it. *

By default this is set to the * {@link RSocketStrategies.Builder#encoder(Encoder[]) defaults} from * {@code RSocketStrategies}. @@ -134,9 +139,9 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { /** * {@inheritDoc} *

When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies} - * is set, this property is re-initialized with the RouteMatcher in it, and - * vice versa, setting this property mutates the {@code RSocketStrategies} - * to change its route matcher. + * is set, this property is re-initialized with the route matcher in it, and + * likewise when this property is set the {@code RSocketStrategies} are + * mutated to change the matcher in it. *

By default this is set to the * {@link RSocketStrategies.Builder#routeMatcher(RouteMatcher) defaults} * from {@code RSocketStrategies}. @@ -150,10 +155,9 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { /** * Configure the registry for adapting various reactive types. *

When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies} - * is set, this property is re-initialized with the - * {@code ReactiveAdapterRegistry} in it, and vice versa, setting this - * property mutates the {@code RSocketStrategies} to change its adapter - * registry. + * is set, this property is re-initialized with the registry in it, and + * likewise when this property is set the {@code RSocketStrategies} are + * mutated to change the registry in it. *

By default this is set to the * {@link RSocketStrategies.Builder#reactiveAdapterStrategy(ReactiveAdapterRegistry) defaults} * from {@code RSocketStrategies}. @@ -168,9 +172,9 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { * Configure a {@link MetadataExtractor} to extract the route along with * other metadata. *

When {@link #setRSocketStrategies(RSocketStrategies) rsocketStrategies} - * is set, this property is re-initialized with the {@code MetadataExtractor} - * in it, and vice versa, setting this property mutates the - * {@code RSocketStrategies} to change its {@code MetadataExtractor}. + * is set, this property is re-initialized with the extractor in it, and + * likewise when this property is set the {@code RSocketStrategies} are + * mutated to change the extractor in it. *

By default this is set to the * {@link RSocketStrategies.Builder#metadataExtractor(MetadataExtractor)} defaults} * from {@code RSocketStrategies}. @@ -206,10 +210,6 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { */ public void setRSocketStrategies(RSocketStrategies rsocketStrategies) { this.strategies = rsocketStrategies; - updateStateFromRSocketStrategies(); - } - - private void updateStateFromRSocketStrategies() { setDecoders(this.strategies.decoders()); setEncoders(this.strategies.encoders()); setRouteMatcher(this.strategies.routeMatcher()); @@ -326,7 +326,7 @@ public class RSocketMessageHandler extends MessageMappingMessageHandler { * and return {@code Mono} with an error signal preventing the * connection. Such a method can also start requests to the client but that * must be done decoupled from handling and from the current thread. - *

Subsequent stream requests can be handled with + *

Subsequent requests on the connection can be handled with * {@link MessageMapping MessageMapping} methods. */ public SocketAcceptor serverResponder() { diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java index f1d2ee4525..106871ec64 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java @@ -24,6 +24,7 @@ import java.util.function.Consumer; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.rsocket.ConnectionSetupPayload; import io.rsocket.DuplexConnection; import io.rsocket.RSocketFactory; import io.rsocket.frame.decoder.PayloadDecoder; @@ -62,19 +63,21 @@ public class DefaultRSocketRequesterBuilderTests { private ClientTransport transport; + private final MockConnection connection = new MockConnection(); + private final TestRSocketFactoryConfigurer rsocketFactoryConfigurer = new TestRSocketFactoryConfigurer(); @Before public void setup() { this.transport = mock(ClientTransport.class); - given(this.transport.connect(anyInt())).willReturn(Mono.just(new MockConnection())); + given(this.transport.connect(anyInt())).willReturn(Mono.just(this.connection)); } @Test @SuppressWarnings("unchecked") - public void shouldApplyCustomizationsAtSubscription() { + public void rsocketFactoryConfigurerAppliesAtSubscription() { Consumer strategiesConfigurer = mock(Consumer.class); RSocketRequester.builder() .rsocketFactory(this.rsocketFactoryConfigurer) @@ -87,7 +90,7 @@ public class DefaultRSocketRequesterBuilderTests { @Test @SuppressWarnings("unchecked") - public void shouldApplyCustomizations() { + public void rsocketFactoryConfigurer() { Consumer rsocketStrategiesConfigurer = mock(Consumer.class); RSocketRequester.builder() .rsocketFactory(this.rsocketFactoryConfigurer) @@ -109,12 +112,12 @@ public class DefaultRSocketRequesterBuilderTests { .block(); assertThat(requester.dataMimeType()) - .as("Default data MimeType, based on the first configured Decoder") + .as("Default data MimeType, based on the first Decoder") .isEqualTo(MimeTypeUtils.TEXT_PLAIN); } @Test - public void defaultDataMimeTypeWithCustomDecoderRegitered() { + public void defaultDataMimeTypeWithCustomDecoderRegistered() { RSocketStrategies strategies = RSocketStrategies.builder() .decoder(new TestJsonDecoder(MimeTypeUtils.APPLICATION_JSON)) .build(); @@ -130,15 +133,45 @@ public class DefaultRSocketRequesterBuilderTests { } @Test - public void dataMimeTypeSet() { + public void dataMimeTypeExplicitlySet() { RSocketRequester requester = RSocketRequester.builder() .dataMimeType(MimeTypeUtils.APPLICATION_JSON) .connect(this.transport) .block(); + ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames()) + .map(ConnectionSetupPayload::create) + .block(); + + assertThat(setupPayload.dataMimeType()).isEqualTo("application/json"); assertThat(requester.dataMimeType()).isEqualTo(MimeTypeUtils.APPLICATION_JSON); } + @Test + public void mimeTypesCannotBeChangedAtRSocketFactoryLevel() { + MimeType dataMimeType = MimeTypeUtils.APPLICATION_JSON; + MimeType metaMimeType = MetadataExtractor.ROUTING; + + RSocketRequester requester = RSocketRequester.builder() + .metadataMimeType(metaMimeType) + .dataMimeType(dataMimeType) + .rsocketFactory(factory -> { + factory.metadataMimeType("text/plain"); + factory.dataMimeType("application/xml"); + }) + .connect(this.transport) + .block(); + + ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames()) + .map(ConnectionSetupPayload::create) + .block(); + + assertThat(setupPayload.dataMimeType()).isEqualTo(dataMimeType.toString()); + assertThat(setupPayload.metadataMimeType()).isEqualTo(metaMimeType.toString()); + assertThat(requester.dataMimeType()).isEqualTo(dataMimeType); + assertThat(requester.metadataMimeType()).isEqualTo(metaMimeType); + } + @Test public void frameDecoderMatchesDataBufferFactory() throws Exception { testFrameDecoder(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT), PayloadDecoder.ZERO_COPY); @@ -170,8 +203,16 @@ public class DefaultRSocketRequesterBuilderTests { static class MockConnection implements DuplexConnection { + private Publisher sentFrames; + + + public Publisher sentFrames() { + return this.sentFrames; + } + @Override public Mono send(Publisher frames) { + this.sentFrames = frames; return Mono.empty(); } @@ -224,7 +265,7 @@ public class DefaultRSocketRequesterBuilderTests { @Override public boolean canDecode(ResolvableType elementType, MimeType mimeType) { - return false; + throw new UnsupportedOperationException(); } @Override diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java index 15db1059c8..2bb92f1612 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java @@ -48,8 +48,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.springframework.messaging.rsocket.DefaultRSocketRequester.COMPOSITE_METADATA; -import static org.springframework.messaging.rsocket.DefaultRSocketRequester.ROUTING; import static org.springframework.util.MimeTypeUtils.TEXT_PLAIN; /** @@ -147,7 +145,7 @@ public class DefaultRSocketRequesterTests { public void metadataCompositeWithRoute() { RSocketRequester requester = RSocketRequester.wrap( - this.rsocket, TEXT_PLAIN, COMPOSITE_METADATA, this.strategies); + this.rsocket, TEXT_PLAIN, MetadataExtractor.COMPOSITE_METADATA, this.strategies); requester.route("toA").data("bodyA").send().block(Duration.ofSeconds(5)); @@ -156,7 +154,7 @@ public class DefaultRSocketRequesterTests { assertThat(iterator.hasNext()).isTrue(); CompositeMetadata.Entry entry = iterator.next(); - assertThat(entry.getMimeType()).isEqualTo(ROUTING.toString()); + assertThat(entry.getMimeType()).isEqualTo(MetadataExtractor.ROUTING.toString()); assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA"); assertThat(iterator.hasNext()).isFalse(); @@ -166,7 +164,7 @@ public class DefaultRSocketRequesterTests { public void metadataCompositeWithRouteAndTextEntry() { RSocketRequester requester = RSocketRequester.wrap( - this.rsocket, TEXT_PLAIN, COMPOSITE_METADATA, this.strategies); + this.rsocket, TEXT_PLAIN, MetadataExtractor.COMPOSITE_METADATA, this.strategies); requester.route("toA") .metadata("My metadata", TEXT_PLAIN).data("bodyA") @@ -178,7 +176,7 @@ public class DefaultRSocketRequesterTests { assertThat(iterator.hasNext()).isTrue(); CompositeMetadata.Entry entry = iterator.next(); - assertThat(entry.getMimeType()).isEqualTo(ROUTING.toString()); + assertThat(entry.getMimeType()).isEqualTo(MetadataExtractor.ROUTING.toString()); assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA"); assertThat(iterator.hasNext()).isTrue(); @@ -206,7 +204,7 @@ public class DefaultRSocketRequesterTests { @Test public void metadataMimeTypeMismatch() { RSocketRequester requester = RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, TEXT_PLAIN, this.strategies); - assertThatThrownBy(() -> requester.metadata("toA", ROUTING).data("bodyA").send().block()) + assertThatThrownBy(() -> requester.metadata("toA", MetadataExtractor.ROUTING).data("bodyA").send().block()) .hasMessageStartingWith("Connection configured for metadata mime type"); } @@ -219,8 +217,8 @@ public class DefaultRSocketRequesterTests { @Test public void supportedMetadataMimeTypes() { - RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, COMPOSITE_METADATA, this.strategies); - RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, ROUTING, this.strategies); + RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, MetadataExtractor.COMPOSITE_METADATA, this.strategies); + RSocketRequester.wrap(this.rsocket, TEXT_PLAIN, MetadataExtractor.ROUTING, this.strategies); } @Test