You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1145 lines
38 KiB
1145 lines
38 KiB
[[rsocket]] |
|
= RSocket |
|
|
|
This section describes Spring Framework's support for the RSocket protocol. |
|
|
|
|
|
[[rsocket-overview]] |
|
== Overview |
|
|
|
RSocket is an application protocol for multiplexed, duplex communication over TCP, |
|
WebSocket, and other byte stream transports, using one of the following interaction |
|
models: |
|
|
|
* `Request-Response` -- send one message and receive one back. |
|
* `Request-Stream` -- send one message and receive a stream of messages back. |
|
* `Channel` -- send streams of messages in both directions. |
|
* `Fire-and-Forget` -- send a one-way message. |
|
|
|
Once the initial connection is made, the "client" vs "server" distinction is lost as |
|
both sides become symmetrical and each side can initiate one of the above interactions. |
|
This is why in the protocol calls the participating sides "requester" and "responder" |
|
while the above interactions are called "request streams" or simply "requests". |
|
|
|
These are the key features and benefits of the RSocket protocol: |
|
|
|
* https://www.reactive-streams.org/[Reactive Streams] semantics across network boundary -- |
|
for streaming requests such as `Request-Stream` and `Channel`, back pressure signals |
|
travel between requester and responder, allowing a requester to slow down a responder at |
|
the source, hence reducing reliance on network layer congestion control, and the need |
|
for buffering at the network level or at any level. |
|
* Request throttling -- this feature is named "Leasing" after the `LEASE` frame that |
|
can be sent from each end to limit the total number of requests allowed by other end |
|
for a given time. Leases are renewed periodically. |
|
* Session resumption -- this is designed for loss of connectivity and requires some state |
|
to be maintained. The state management is transparent for applications, and works well |
|
in combination with back pressure which can stop a producer when possible and reduce |
|
the amount of state required. |
|
* Fragmentation and re-assembly of large messages. |
|
* Keepalive (heartbeats). |
|
|
|
RSocket has {gh-rsocket}[implementations] in multiple languages. The |
|
{gh-rsocket-java}[Java library] is built on https://projectreactor.io/[Project Reactor], |
|
and https://github.com/reactor/reactor-netty[Reactor Netty] for the transport. That means |
|
signals from Reactive Streams Publishers in your application propagate transparently |
|
through RSocket across the network. |
|
|
|
|
|
|
|
[[rsocket-protocol]] |
|
=== The Protocol |
|
|
|
One of the benefits of RSocket is that it has well defined behavior on the wire and an |
|
easy to read https://rsocket.io/about/protocol[specification] along with some protocol |
|
{gh-rsocket}/rsocket/tree/master/Extensions[extensions]. Therefore it is |
|
a good idea to read the spec, independent of language implementations and higher level |
|
framework APIs. This section provides a succinct overview to establish some context. |
|
|
|
**Connecting** |
|
|
|
Initially a client connects to a server via some low level streaming transport such |
|
as TCP or WebSocket and sends a `SETUP` frame to the server to set parameters for the |
|
connection. |
|
|
|
The server may reject the `SETUP` frame, but generally after it is sent (for the client) |
|
and received (for the server), both sides can begin to make requests, unless `SETUP` |
|
indicates use of leasing semantics to limit the number of requests, in which case |
|
both sides must wait for a `LEASE` frame from the other end to permit making requests. |
|
|
|
**Making Requests** |
|
|
|
Once a connection is established, both sides may initiate a request through one of the |
|
frames `REQUEST_RESPONSE`, `REQUEST_STREAM`, `REQUEST_CHANNEL`, or `REQUEST_FNF`. Each of |
|
those frames carries one message from the requester to the responder. |
|
|
|
The responder may then return `PAYLOAD` frames with response messages, and in the case |
|
of `REQUEST_CHANNEL` the requester may also send `PAYLOAD` frames with more request |
|
messages. |
|
|
|
When a request involves a stream of messages such as `Request-Stream` and `Channel`, |
|
the responder must respect demand signals from the requester. Demand is expressed as a |
|
number of messages. Initial demand is specified in `REQUEST_STREAM` and |
|
`REQUEST_CHANNEL` frames. Subsequent demand is signaled via `REQUEST_N` frames. |
|
|
|
Each side may also send metadata notifications, via the `METADATA_PUSH` frame, that do not |
|
pertain to any individual request but rather to the connection as a whole. |
|
|
|
**Message Format** |
|
|
|
RSocket messages contain data and metadata. Metadata can be used to send a route, a |
|
security token, etc. Data and metadata can be formatted differently. Mime types for each |
|
are declared in the `SETUP` frame and apply to all requests on a given connection. |
|
|
|
While all messages can have metadata, typically metadata such as a route are per-request |
|
and therefore only included in the first message on a request, i.e. with one of the frames |
|
`REQUEST_RESPONSE`, `REQUEST_STREAM`, `REQUEST_CHANNEL`, or `REQUEST_FNF`. |
|
|
|
Protocol extensions define common metadata formats for use in applications: |
|
|
|
* {gh-rsocket-extensions}/CompositeMetadata.md[Composite Metadata]-- multiple, |
|
independently formatted metadata entries. |
|
* {gh-rsocket-extensions}/Routing.md[Routing] -- the route for a request. |
|
|
|
|
|
|
|
[[rsocket-java]] |
|
=== Java Implementation |
|
|
|
The {gh-rsocket-java}[Java implementation] for RSocket is built on |
|
https://projectreactor.io/[Project Reactor]. The transports for TCP and WebSocket are |
|
built on https://github.com/reactor/reactor-netty[Reactor Netty]. As a Reactive Streams |
|
library, Reactor simplifies the job of implementing the protocol. For applications it is |
|
a natural fit to use `Flux` and `Mono` with declarative operators and transparent back |
|
pressure support. |
|
|
|
The API in RSocket Java is intentionally minimal and basic. It focuses on protocol |
|
features and leaves the application programming model (e.g. RPC codegen vs other) as a |
|
higher level, independent concern. |
|
|
|
The main contract |
|
{gh-rsocket-java}/blob/master/rsocket-core/src/main/java/io/rsocket/RSocket.java[io.rsocket.RSocket] |
|
models the four request interaction types with `Mono` representing a promise for a |
|
single message, `Flux` a stream of messages, and `io.rsocket.Payload` the actual |
|
message with access to data and metadata as byte buffers. The `RSocket` contract is used |
|
symmetrically. For requesting, the application is given an `RSocket` to perform |
|
requests with. For responding, the application implements `RSocket` to handle requests. |
|
|
|
This is not meant to be a thorough introduction. For the most part, Spring applications |
|
will not have to use its API directly. However it may be important to see or experiment |
|
with RSocket independent of Spring. The RSocket Java repository contains a number of |
|
{gh-rsocket-java}/tree/master/rsocket-examples[sample apps] that |
|
demonstrate its API and protocol features. |
|
|
|
|
|
|
|
[[rsocket-spring]] |
|
=== Spring Support |
|
|
|
The `spring-messaging` module contains the following: |
|
|
|
* xref:rsocket.adoc#rsocket-requester[RSocketRequester] -- fluent API to make requests |
|
through an `io.rsocket.RSocket` with data and metadata encoding/decoding. |
|
* xref:rsocket.adoc#rsocket-annot-responders[Annotated Responders] -- `@MessageMapping` |
|
and `@RSocketExchange` annotated handler methods for responding. |
|
* xref:rsocket.adoc#rsocket-interface[RSocket Interface] -- RSocket service declaration |
|
as Java interface with `@RSocketExchange` methods, for use as requester or responder. |
|
|
|
The `spring-web` module contains `Encoder` and `Decoder` implementations such as Jackson |
|
CBOR/JSON, and Protobuf that RSocket applications will likely need. It also contains the |
|
`PathPatternParser` that can be plugged in for efficient route matching. |
|
|
|
Spring Boot 2.2 supports standing up an RSocket server over TCP or WebSocket, including |
|
the option to expose RSocket over WebSocket in a WebFlux server. There is also client |
|
support and auto-configuration for an `RSocketRequester.Builder` and `RSocketStrategies`. |
|
See the |
|
https://docs.spring.io/spring-boot/docs/current/reference/htmlsingle/#boot-features-rsocket[RSocket section] |
|
in the Spring Boot reference for more details. |
|
|
|
Spring Security 5.2 provides RSocket support. |
|
|
|
Spring Integration 5.2 provides inbound and outbound gateways to interact with RSocket |
|
clients and servers. See the Spring Integration Reference Manual for more details. |
|
|
|
Spring Cloud Gateway supports RSocket connections. |
|
|
|
|
|
|
|
[[rsocket-requester]] |
|
== RSocketRequester |
|
|
|
`RSocketRequester` provides a fluent API to perform RSocket requests, accepting and |
|
returning objects for data and metadata instead of low level data buffers. It can be used |
|
symmetrically, to make requests from clients and to make requests from servers. |
|
|
|
|
|
[[rsocket-requester-client]] |
|
=== Client Requester |
|
|
|
To obtain an `RSocketRequester` on the client side is to connect to a server which involves |
|
sending an RSocket `SETUP` frame with connection settings. `RSocketRequester` provides a |
|
builder that helps to prepare an `io.rsocket.core.RSocketConnector` including connection |
|
settings for the `SETUP` frame. |
|
|
|
This is the most basic way to connect with default settings: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000); |
|
|
|
URI url = URI.create("https://example.org:8080/rsocket"); |
|
RSocketRequester requester = RSocketRequester.builder().webSocket(url); |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
val requester = RSocketRequester.builder().tcp("localhost", 7000) |
|
|
|
URI url = URI.create("https://example.org:8080/rsocket"); |
|
val requester = RSocketRequester.builder().webSocket(url) |
|
---- |
|
====== |
|
|
|
The above does not connect immediately. When requests are made, a shared connection is |
|
established transparently and used. |
|
|
|
|
|
[[rsocket-requester-client-setup]] |
|
==== Connection Setup |
|
|
|
`RSocketRequester.Builder` provides the following to customize the initial `SETUP` frame: |
|
|
|
* `dataMimeType(MimeType)` -- set the mime type for data on the connection. |
|
* `metadataMimeType(MimeType)` -- set the mime type for metadata on the connection. |
|
* `setupData(Object)` -- data to include in the `SETUP`. |
|
* `setupRoute(String, Object...)` -- route in the metadata to include in the `SETUP`. |
|
* `setupMetadata(Object, MimeType)` -- other metadata to include in the `SETUP`. |
|
|
|
For data, the default mime type is derived from the first configured `Decoder`. For |
|
metadata, the default mime type is |
|
{gh-rsocket-extensions}/CompositeMetadata.md[composite metadata] which allows multiple |
|
metadata value and mime type pairs per request. Typically both don't need to be changed. |
|
|
|
Data and metadata in the `SETUP` frame is optional. On the server side, |
|
xref:rsocket.adoc#rsocket-annot-connectmapping[@ConnectMapping] methods can be used to handle the start of a |
|
connection and the content of the `SETUP` frame. Metadata may be used for connection |
|
level security. |
|
|
|
|
|
[[rsocket-requester-client-strategies]] |
|
==== Strategies |
|
|
|
`RSocketRequester.Builder` accepts `RSocketStrategies` to configure the requester. |
|
You'll need to use this to provide encoders and decoders for (de)-serialization of data and |
|
metadata values. By default only the basic codecs from `spring-core` for `String`, |
|
`byte[]`, and `ByteBuffer` are registered. Adding `spring-web` provides access to more that |
|
can be registered as follows: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
RSocketStrategies strategies = RSocketStrategies.builder() |
|
.encoders(encoders -> encoders.add(new Jackson2CborEncoder())) |
|
.decoders(decoders -> decoders.add(new Jackson2CborDecoder())) |
|
.build(); |
|
|
|
RSocketRequester requester = RSocketRequester.builder() |
|
.rsocketStrategies(strategies) |
|
.tcp("localhost", 7000); |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
val strategies = RSocketStrategies.builder() |
|
.encoders { it.add(Jackson2CborEncoder()) } |
|
.decoders { it.add(Jackson2CborDecoder()) } |
|
.build() |
|
|
|
val requester = RSocketRequester.builder() |
|
.rsocketStrategies(strategies) |
|
.tcp("localhost", 7000) |
|
---- |
|
====== |
|
|
|
`RSocketStrategies` is designed for re-use. In some scenarios, e.g. client and server in |
|
the same application, it may be preferable to declare it in Spring configuration. |
|
|
|
|
|
[[rsocket-requester-client-responder]] |
|
==== Client Responders |
|
|
|
`RSocketRequester.Builder` can be used to configure responders to requests from the |
|
server. |
|
|
|
You can use annotated handlers for client-side responding based on the same |
|
infrastructure that's used on a server, but registered programmatically as follows: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
RSocketStrategies strategies = RSocketStrategies.builder() |
|
.routeMatcher(new PathPatternRouteMatcher()) // <1> |
|
.build(); |
|
|
|
SocketAcceptor responder = |
|
RSocketMessageHandler.responder(strategies, new ClientHandler()); // <2> |
|
|
|
RSocketRequester requester = RSocketRequester.builder() |
|
.rsocketConnector(connector -> connector.acceptor(responder)) // <3> |
|
.tcp("localhost", 7000); |
|
---- |
|
<1> Use `PathPatternRouteMatcher`, if `spring-web` is present, for efficient |
|
route matching. |
|
<2> Create a responder from a class with `@MessageMapping` and/or `@ConnectMapping` methods. |
|
<3> Register the responder. |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
val strategies = RSocketStrategies.builder() |
|
.routeMatcher(PathPatternRouteMatcher()) // <1> |
|
.build() |
|
|
|
val responder = |
|
RSocketMessageHandler.responder(strategies, new ClientHandler()); // <2> |
|
|
|
val requester = RSocketRequester.builder() |
|
.rsocketConnector { it.acceptor(responder) } // <3> |
|
.tcp("localhost", 7000) |
|
---- |
|
<1> Use `PathPatternRouteMatcher`, if `spring-web` is present, for efficient |
|
route matching. |
|
<2> Create a responder from a class with `@MessageMapping` and/or `@ConnectMapping` methods. |
|
<3> Register the responder. |
|
====== |
|
|
|
Note the above is only a shortcut designed for programmatic registration of client |
|
responders. For alternative scenarios, where client responders are in Spring configuration, |
|
you can still declare `RSocketMessageHandler` as a Spring bean and then apply as follows: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
ApplicationContext context = ... ; |
|
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class); |
|
|
|
RSocketRequester requester = RSocketRequester.builder() |
|
.rsocketConnector(connector -> connector.acceptor(handler.responder())) |
|
.tcp("localhost", 7000); |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
import org.springframework.beans.factory.getBean |
|
|
|
val context: ApplicationContext = ... |
|
val handler = context.getBean<RSocketMessageHandler>() |
|
|
|
val requester = RSocketRequester.builder() |
|
.rsocketConnector { it.acceptor(handler.responder()) } |
|
.tcp("localhost", 7000) |
|
---- |
|
====== |
|
|
|
For the above you may also need to use `setHandlerPredicate` in `RSocketMessageHandler` to |
|
switch to a different strategy for detecting client responders, e.g. based on a custom |
|
annotation such as `@RSocketClientResponder` vs the default `@Controller`. This |
|
is necessary in scenarios with client and server, or multiple clients in the same |
|
application. |
|
|
|
See also xref:rsocket.adoc#rsocket-annot-responders[Annotated Responders], for more on the programming model. |
|
|
|
|
|
[[rsocket-requester-client-advanced]] |
|
==== Advanced |
|
|
|
`RSocketRequesterBuilder` provides a callback to expose the underlying |
|
`io.rsocket.core.RSocketConnector` for further configuration options for keepalive |
|
intervals, session resumption, interceptors, and more. You can configure options |
|
at that level as follows: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
RSocketRequester requester = RSocketRequester.builder() |
|
.rsocketConnector(connector -> { |
|
// ... |
|
}) |
|
.tcp("localhost", 7000); |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
val requester = RSocketRequester.builder() |
|
.rsocketConnector { |
|
//... |
|
} |
|
.tcp("localhost", 7000) |
|
---- |
|
====== |
|
|
|
|
|
[[rsocket-requester-server]] |
|
=== Server Requester |
|
|
|
To make requests from a server to connected clients is a matter of obtaining the |
|
requester for the connected client from the server. |
|
|
|
In xref:rsocket.adoc#rsocket-annot-responders[Annotated Responders], `@ConnectMapping` and `@MessageMapping` methods support an |
|
`RSocketRequester` argument. Use it to access the requester for the connection. Keep in |
|
mind that `@ConnectMapping` methods are essentially handlers of the `SETUP` frame which |
|
must be handled before requests can begin. Therefore, requests at the very start must be |
|
decoupled from handling. For example: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
@ConnectMapping |
|
Mono<Void> handle(RSocketRequester requester) { |
|
requester.route("status").data("5") |
|
.retrieveFlux(StatusReport.class) |
|
.subscribe(bar -> { // <1> |
|
// ... |
|
}); |
|
return ... // <2> |
|
} |
|
---- |
|
<1> Start the request asynchronously, independent from handling. |
|
<2> Perform handling and return completion `Mono<Void>`. |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
@ConnectMapping |
|
suspend fun handle(requester: RSocketRequester) { |
|
GlobalScope.launch { |
|
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { // <1> |
|
// ... |
|
} |
|
} |
|
/// ... <2> |
|
} |
|
---- |
|
<1> Start the request asynchronously, independent from handling. |
|
<2> Perform handling in the suspending function. |
|
====== |
|
|
|
|
|
|
|
[[rsocket-requester-requests]] |
|
=== Requests |
|
|
|
Once you have a xref:rsocket.adoc#rsocket-requester-client[client] or |
|
xref:rsocket.adoc#rsocket-requester-server[server] requester, you can make requests as follows: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
ViewBox viewBox = ... ; |
|
|
|
Flux<AirportLocation> locations = requester.route("locate.radars.within") // <1> |
|
.data(viewBox) // <2> |
|
.retrieveFlux(AirportLocation.class); // <3> |
|
|
|
---- |
|
<1> Specify a route to include in the metadata of the request message. |
|
<2> Provide data for the request message. |
|
<3> Declare the expected response. |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
val viewBox: ViewBox = ... |
|
|
|
val locations = requester.route("locate.radars.within") // <1> |
|
.data(viewBox) // <2> |
|
.retrieveFlow<AirportLocation>() // <3> |
|
---- |
|
<1> Specify a route to include in the metadata of the request message. |
|
<2> Provide data for the request message. |
|
<3> Declare the expected response. |
|
====== |
|
|
|
The interaction type is determined implicitly from the cardinality of the input and |
|
output. The above example is a `Request-Stream` because one value is sent and a stream |
|
of values is received. For the most part you don't need to think about this as long as the |
|
choice of input and output matches an RSocket interaction type and the types of input and |
|
output expected by the responder. The only example of an invalid combination is many-to-one. |
|
|
|
The `data(Object)` method also accepts any Reactive Streams `Publisher`, including |
|
`Flux` and `Mono`, as well as any other producer of value(s) that is registered in the |
|
`ReactiveAdapterRegistry`. For a multi-value `Publisher` such as `Flux` which produces the |
|
same types of values, consider using one of the overloaded `data` methods to avoid having |
|
type checks and `Encoder` lookup on every element: |
|
|
|
[source,java,indent=0,subs="verbatim,quotes"] |
|
---- |
|
data(Object producer, Class<?> elementClass); |
|
data(Object producer, ParameterizedTypeReference<?> elementTypeRef); |
|
---- |
|
|
|
The `data(Object)` step is optional. Skip it for requests that don't send data: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
Mono<AirportLocation> location = requester.route("find.radar.EWR")) |
|
.retrieveMono(AirportLocation.class); |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
import org.springframework.messaging.rsocket.retrieveAndAwait |
|
|
|
val location = requester.route("find.radar.EWR") |
|
.retrieveAndAwait<AirportLocation>() |
|
---- |
|
====== |
|
|
|
Extra metadata values can be added if using |
|
{gh-rsocket-extensions}/CompositeMetadata.md[composite metadata] (the default) and if the |
|
values are supported by a registered `Encoder`. For example: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
String securityToken = ... ; |
|
ViewBox viewBox = ... ; |
|
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0"); |
|
|
|
Flux<AirportLocation> locations = requester.route("locate.radars.within") |
|
.metadata(securityToken, mimeType) |
|
.data(viewBox) |
|
.retrieveFlux(AirportLocation.class); |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
import org.springframework.messaging.rsocket.retrieveFlow |
|
|
|
val requester: RSocketRequester = ... |
|
|
|
val securityToken: String = ... |
|
val viewBox: ViewBox = ... |
|
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0") |
|
|
|
val locations = requester.route("locate.radars.within") |
|
.metadata(securityToken, mimeType) |
|
.data(viewBox) |
|
.retrieveFlow<AirportLocation>() |
|
---- |
|
====== |
|
|
|
For `Fire-and-Forget` use the `send()` method that returns `Mono<Void>`. Note that the `Mono` |
|
indicates only that the message was successfully sent, and not that it was handled. |
|
|
|
For `Metadata-Push` use the `sendMetadata()` method with a `Mono<Void>` return value. |
|
|
|
|
|
|
|
[[rsocket-annot-responders]] |
|
== Annotated Responders |
|
|
|
RSocket responders can be implemented as `@MessageMapping` and `@ConnectMapping` methods. |
|
`@MessageMapping` methods handle individual requests while `@ConnectMapping` methods handle |
|
connection-level events (setup and metadata push). Annotated responders are supported |
|
symmetrically, for responding from the server side and for responding from the client side. |
|
|
|
|
|
|
|
[[rsocket-annot-responders-server]] |
|
=== Server Responders |
|
|
|
To use annotated responders on the server side, add `RSocketMessageHandler` to your Spring |
|
configuration to detect `@Controller` beans with `@MessageMapping` and `@ConnectMapping` |
|
methods: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
@Configuration |
|
static class ServerConfig { |
|
|
|
@Bean |
|
public RSocketMessageHandler rsocketMessageHandler() { |
|
RSocketMessageHandler handler = new RSocketMessageHandler(); |
|
handler.routeMatcher(new PathPatternRouteMatcher()); |
|
return handler; |
|
} |
|
} |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
@Configuration |
|
class ServerConfig { |
|
|
|
@Bean |
|
fun rsocketMessageHandler() = RSocketMessageHandler().apply { |
|
routeMatcher = PathPatternRouteMatcher() |
|
} |
|
} |
|
---- |
|
====== |
|
|
|
Then start an RSocket server through the Java RSocket API and plug the |
|
`RSocketMessageHandler` for the responder as follows: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
ApplicationContext context = ... ; |
|
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class); |
|
|
|
CloseableChannel server = |
|
RSocketServer.create(handler.responder()) |
|
.bind(TcpServerTransport.create("localhost", 7000)) |
|
.block(); |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
import org.springframework.beans.factory.getBean |
|
|
|
val context: ApplicationContext = ... |
|
val handler = context.getBean<RSocketMessageHandler>() |
|
|
|
val server = RSocketServer.create(handler.responder()) |
|
.bind(TcpServerTransport.create("localhost", 7000)) |
|
.awaitSingle() |
|
---- |
|
====== |
|
|
|
`RSocketMessageHandler` supports |
|
{gh-rsocket-extensions}/CompositeMetadata.md[composite] and |
|
{gh-rsocket-extensions}/Routing.md[routing] metadata by default. You can set its |
|
xref:rsocket.adoc#rsocket-metadata-extractor[MetadataExtractor] if you need to switch to a |
|
different mime type or register additional metadata mime types. |
|
|
|
You'll need to set the `Encoder` and `Decoder` instances required for metadata and data |
|
formats to support. You'll likely need the `spring-web` module for codec implementations. |
|
|
|
By default `SimpleRouteMatcher` is used for matching routes via `AntPathMatcher`. |
|
We recommend plugging in the `PathPatternRouteMatcher` from `spring-web` for |
|
efficient route matching. RSocket routes can be hierarchical but are not URL paths. |
|
Both route matchers are configured to use "." as separator by default and there is no URL |
|
decoding as with HTTP URLs. |
|
|
|
`RSocketMessageHandler` can be configured via `RSocketStrategies` which may be useful if |
|
you need to share configuration between a client and a server in the same process: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
@Configuration |
|
static class ServerConfig { |
|
|
|
@Bean |
|
public RSocketMessageHandler rsocketMessageHandler() { |
|
RSocketMessageHandler handler = new RSocketMessageHandler(); |
|
handler.setRSocketStrategies(rsocketStrategies()); |
|
return handler; |
|
} |
|
|
|
@Bean |
|
public RSocketStrategies rsocketStrategies() { |
|
return RSocketStrategies.builder() |
|
.encoders(encoders -> encoders.add(new Jackson2CborEncoder())) |
|
.decoders(decoders -> decoders.add(new Jackson2CborDecoder())) |
|
.routeMatcher(new PathPatternRouteMatcher()) |
|
.build(); |
|
} |
|
} |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
@Configuration |
|
class ServerConfig { |
|
|
|
@Bean |
|
fun rsocketMessageHandler() = RSocketMessageHandler().apply { |
|
rSocketStrategies = rsocketStrategies() |
|
} |
|
|
|
@Bean |
|
fun rsocketStrategies() = RSocketStrategies.builder() |
|
.encoders { it.add(Jackson2CborEncoder()) } |
|
.decoders { it.add(Jackson2CborDecoder()) } |
|
.routeMatcher(PathPatternRouteMatcher()) |
|
.build() |
|
} |
|
---- |
|
====== |
|
|
|
|
|
|
|
[[rsocket-annot-responders-client]] |
|
=== Client Responders |
|
|
|
Annotated responders on the client side need to be configured in the |
|
`RSocketRequester.Builder`. For details, see |
|
xref:rsocket.adoc#rsocket-requester-client-responder[Client Responders]. |
|
|
|
|
|
|
|
[[rsocket-annot-messagemapping]] |
|
=== @MessageMapping |
|
|
|
Once xref:rsocket.adoc#rsocket-annot-responders-server[server] or |
|
xref:rsocket.adoc#rsocket-annot-responders-client[client] responder configuration is in place, |
|
`@MessageMapping` methods can be used as follows: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
@Controller |
|
public class RadarsController { |
|
|
|
@MessageMapping("locate.radars.within") |
|
public Flux<AirportLocation> radars(MapRequest request) { |
|
// ... |
|
} |
|
} |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
@Controller |
|
class RadarsController { |
|
|
|
@MessageMapping("locate.radars.within") |
|
fun radars(request: MapRequest): Flow<AirportLocation> { |
|
// ... |
|
} |
|
} |
|
---- |
|
====== |
|
|
|
The above `@MessageMapping` method responds to a Request-Stream interaction having the |
|
route "locate.radars.within". It supports a flexible method signature with the option to |
|
use the following method arguments: |
|
|
|
[cols="1,3",options="header"] |
|
|=== |
|
| Method Argument |
|
| Description |
|
|
|
| `@Payload` |
|
| The payload of the request. This can be a concrete value of asynchronous types like |
|
`Mono` or `Flux`. |
|
|
|
*Note:* Use of the annotation is optional. A method argument that is not a simple type |
|
and is not any of the other supported arguments, is assumed to be the expected payload. |
|
|
|
| `RSocketRequester` |
|
| Requester for making requests to the remote end. |
|
|
|
| `@DestinationVariable` |
|
| Value extracted from the route based on variables in the mapping pattern, e.g. |
|
pass:q[`@MessageMapping("find.radar.{id}")`]. |
|
|
|
| `@Header` |
|
| Metadata value registered for extraction as described in xref:rsocket.adoc#rsocket-metadata-extractor[MetadataExtractor]. |
|
|
|
| `@Headers Map<String, Object>` |
|
| All metadata values registered for extraction as described in xref:rsocket.adoc#rsocket-metadata-extractor[MetadataExtractor]. |
|
|
|
|=== |
|
|
|
The return value is expected to be one or more Objects to be serialized as response |
|
payloads. That can be asynchronous types like `Mono` or `Flux`, a concrete value, or |
|
either `void` or a no-value asynchronous type such as `Mono<Void>`. |
|
|
|
The RSocket interaction type that an `@MessageMapping` method supports is determined from |
|
the cardinality of the input (i.e. `@Payload` argument) and of the output, where |
|
cardinality means the following: |
|
|
|
[%autowidth] |
|
[cols=2*,options="header"] |
|
|=== |
|
| Cardinality |
|
| Description |
|
|
|
| 1 |
|
| Either an explicit value, or a single-value asynchronous type such as `Mono<T>`. |
|
|
|
| Many |
|
| A multi-value asynchronous type such as `Flux<T>`. |
|
|
|
| 0 |
|
| For input this means the method does not have an `@Payload` argument. |
|
|
|
For output this is `void` or a no-value asynchronous type such as `Mono<Void>`. |
|
|=== |
|
|
|
The table below shows all input and output cardinality combinations and the corresponding |
|
interaction type(s): |
|
|
|
[%autowidth] |
|
[cols=3*,options="header"] |
|
|=== |
|
| Input Cardinality |
|
| Output Cardinality |
|
| Interaction Types |
|
|
|
| 0, 1 |
|
| 0 |
|
| Fire-and-Forget, Request-Response |
|
|
|
| 0, 1 |
|
| 1 |
|
| Request-Response |
|
|
|
| 0, 1 |
|
| Many |
|
| Request-Stream |
|
|
|
| Many |
|
| 0, 1, Many |
|
| Request-Channel |
|
|
|
|=== |
|
|
|
|
|
|
|
[[rsocket-annot-rsocketexchange]] |
|
=== @RSocketExchange |
|
|
|
As an alternative to `@MessageMapping`, you can also handle requests with |
|
`@RSocketExchange` methods. Such methods are declared on an |
|
xref:rsocket-interface[RSocket Interface] and can be used as a requester via |
|
`RSocketServiceProxyFactory` or implemented by a responder. |
|
|
|
For example, to handle requests as a responder: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
public interface RadarsService { |
|
|
|
@RSocketExchange("locate.radars.within") |
|
Flux<AirportLocation> radars(MapRequest request); |
|
} |
|
|
|
@Controller |
|
public class RadarsController implements RadarsService { |
|
|
|
public Flux<AirportLocation> radars(MapRequest request) { |
|
// ... |
|
} |
|
} |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
interface RadarsService { |
|
|
|
@RSocketExchange("locate.radars.within") |
|
fun radars(request: MapRequest): Flow<AirportLocation> |
|
} |
|
|
|
@Controller |
|
class RadarsController : RadarsService { |
|
|
|
override fun radars(request: MapRequest): Flow<AirportLocation> { |
|
// ... |
|
} |
|
} |
|
---- |
|
====== |
|
|
|
There some differences between `@RSocketExhange` and `@MessageMapping` since the |
|
former needs to remain suitable for requester and responder use. For example, while |
|
`@MessageMapping` can be declared to handle any number of routes and each route can |
|
be a pattern, `@RSocketExchange` must be declared with a single, concrete route. There are |
|
also small differences in the supported method parameters related to metadata, see |
|
xref:rsocket-annot-messagemapping[@MessageMapping] and |
|
xref:rsocket-interface[RSocket Interface] for a list of supported parameters. |
|
|
|
`@RSocketExchange` can be used at the type level to specify a common prefix for all routes |
|
for a given RSocket service interface. |
|
|
|
|
|
[[rsocket-annot-connectmapping]] |
|
=== @ConnectMapping |
|
|
|
`@ConnectMapping` handles the `SETUP` frame at the start of an RSocket connection, and |
|
any subsequent metadata push notifications through the `METADATA_PUSH` frame, i.e. |
|
`metadataPush(Payload)` in `io.rsocket.RSocket`. |
|
|
|
`@ConnectMapping` methods support the same arguments as |
|
xref:rsocket.adoc#rsocket-annot-messagemapping[@MessageMapping] but based on metadata and data from the `SETUP` and |
|
`METADATA_PUSH` frames. `@ConnectMapping` can have a pattern to narrow handling to |
|
specific connections that have a route in the metadata, or if no patterns are declared |
|
then all connections match. |
|
|
|
`@ConnectMapping` methods cannot return data and must be declared with `void` or |
|
`Mono<Void>` as the return value. If handling returns an error for a new |
|
connection then the connection is rejected. Handling must not be held up to make |
|
requests to the `RSocketRequester` for the connection. See |
|
xref:rsocket.adoc#rsocket-requester-server[Server Requester] for details. |
|
|
|
|
|
|
|
|
|
[[rsocket-metadata-extractor]] |
|
== MetadataExtractor |
|
|
|
Responders must interpret metadata. |
|
{gh-rsocket-extensions}/CompositeMetadata.md[Composite metadata] allows independently |
|
formatted metadata values (e.g. for routing, security, tracing) each with its own mime |
|
type. Applications need a way to configure metadata mime types to support, and a way |
|
to access extracted values. |
|
|
|
`MetadataExtractor` is a contract to take serialized metadata and return decoded |
|
name-value pairs that can then be accessed like headers by name, for example via `@Header` |
|
in annotated handler methods. |
|
|
|
`DefaultMetadataExtractor` can be given `Decoder` instances to decode metadata. Out of |
|
the box it has built-in support for |
|
{gh-rsocket-extensions}/Routing.md["message/x.rsocket.routing.v0"] which it decodes to |
|
`String` and saves under the "route" key. For any other mime type you'll need to provide |
|
a `Decoder` and register the mime type as follows: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders); |
|
extractor.metadataToExtract(fooMimeType, Foo.class, "foo"); |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
import org.springframework.messaging.rsocket.metadataToExtract |
|
|
|
val extractor = DefaultMetadataExtractor(metadataDecoders) |
|
extractor.metadataToExtract<Foo>(fooMimeType, "foo") |
|
---- |
|
====== |
|
|
|
Composite metadata works well to combine independent metadata values. However the |
|
requester might not support composite metadata, or may choose not to use it. For this, |
|
`DefaultMetadataExtractor` may needs custom logic to map the decoded value to the output |
|
map. Here is an example where JSON is used for metadata: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders); |
|
extractor.metadataToExtract( |
|
MimeType.valueOf("application/vnd.myapp.metadata+json"), |
|
new ParameterizedTypeReference<Map<String,String>>() {}, |
|
(jsonMap, outputMap) -> { |
|
outputMap.putAll(jsonMap); |
|
}); |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
import org.springframework.messaging.rsocket.metadataToExtract |
|
|
|
val extractor = DefaultMetadataExtractor(metadataDecoders) |
|
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap -> |
|
outputMap.putAll(jsonMap) |
|
} |
|
---- |
|
====== |
|
|
|
When configuring `MetadataExtractor` through `RSocketStrategies`, you can let |
|
`RSocketStrategies.Builder` create the extractor with the configured decoders, and |
|
simply use a callback to customize registrations as follows: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
RSocketStrategies strategies = RSocketStrategies.builder() |
|
.metadataExtractorRegistry(registry -> { |
|
registry.metadataToExtract(fooMimeType, Foo.class, "foo"); |
|
// ... |
|
}) |
|
.build(); |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
import org.springframework.messaging.rsocket.metadataToExtract |
|
|
|
val strategies = RSocketStrategies.builder() |
|
.metadataExtractorRegistry { registry: MetadataExtractorRegistry -> |
|
registry.metadataToExtract<Foo>(fooMimeType, "foo") |
|
// ... |
|
} |
|
.build() |
|
---- |
|
====== |
|
|
|
|
|
|
|
|
|
[[rsocket-interface]] |
|
== RSocket Interface |
|
|
|
The Spring Framework lets you define an RSocket service as a Java interface with |
|
`@RSocketExchange` methods. You can pass such an interface to `RSocketServiceProxyFactory` |
|
to create a proxy which performs requests through an |
|
xref:rsocket.adoc#rsocket-requester[RSocketRequester]. You can also implement the |
|
interface as a responder that handles requests. |
|
|
|
Start by creating the interface with `@RSocketExchange` methods: |
|
|
|
[source,java,indent=0,subs="verbatim,quotes"] |
|
---- |
|
interface RadarService { |
|
|
|
@RSocketExchange("radars") |
|
Flux<AirportLocation> getRadars(@Payload MapRequest request); |
|
|
|
// more RSocket exchange methods... |
|
|
|
} |
|
---- |
|
|
|
Now you can create a proxy that performs requests when methods are called: |
|
|
|
[source,java,indent=0,subs="verbatim,quotes"] |
|
---- |
|
RSocketRequester requester = ... ; |
|
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build(); |
|
|
|
RadarService service = factory.createClient(RadarService.class); |
|
---- |
|
|
|
You can also implement the interface to handle requests as a responder. |
|
See xref:rsocket.adoc#rsocket-annot-rsocketexchange[Annotated Responders]. |
|
|
|
|
|
|
|
[[rsocket-interface-method-parameters]] |
|
=== Method Parameters |
|
|
|
Annotated, RSocket exchange methods support flexible method signatures with the following |
|
method parameters: |
|
|
|
[cols="1,2", options="header"] |
|
|=== |
|
| Method argument | Description |
|
|
|
| `@DestinationVariable` |
|
| Add a route variable to pass to `RSocketRequester` along with the route from the |
|
`@RSocketExchange` annotation in order to expand template placeholders in the route. |
|
This variable can be a String or any Object, which is then formatted via `toString()`. |
|
|
|
| `@Payload` |
|
| Set the input payload(s) for the request. This can be a concrete value, or any producer |
|
of values that can be adapted to a Reactive Streams `Publisher` via |
|
`ReactiveAdapterRegistry` |
|
|
|
| `Object`, if followed by `MimeType` |
|
| The value for a metadata entry in the input payload. This can be any `Object` as long |
|
as the next argument is the metadata entry `MimeType`. The value can be a concrete |
|
value or any producer of a single value that can be adapted to a Reactive Streams |
|
`Publisher` via `ReactiveAdapterRegistry`. |
|
|
|
| `MimeType` |
|
| The `MimeType` for a metadata entry. The preceding method argument is expected to be |
|
the metadata value. |
|
|
|
|=== |
|
|
|
|
|
[[rsocket-interface-return-values]] |
|
=== Return Values |
|
|
|
Annotated, RSocket exchange methods support return values that are concrete value(s), or |
|
any producer of value(s) that can be adapted to a Reactive Streams `Publisher` via |
|
`ReactiveAdapterRegistry`. |
|
|
|
By default, the behavior of RSocket service methods with synchronous (blocking) method |
|
signature depends on response timeout settings of the underlying RSocket `ClientTransport` |
|
as well as RSocket keep-alive settings. `RSocketServiceProxyFactory.Builder` does expose a |
|
`blockTimeout` option that also lets you configure the maximum time to block for a response, |
|
but we recommend configuring timeout values at the RSocket level for more control. |
|
|
|
|