Browse Source

RSocket reference documentation

Closes: gh-23147
pull/23681/head
Rossen Stoyanchev 5 years ago
parent
commit
88bb2aabbf
  1. 628
      src/docs/asciidoc/rsocket.adoc
  2. 8
      src/docs/asciidoc/web-reactive.adoc

628
src/docs/asciidoc/rsocket.adoc

@ -0,0 +1,628 @@ @@ -0,0 +1,628 @@
[[rsocket]]
= RSocket
[[rsocket-overview]]
== Overview
RSocket is an application protocol for multiplexed, duplex communication over TCP,
WebSocket, and other byte stream transports, using one of the following interaction
models:
* `Request-Response` -- send one message and receive one back.
* `Request-Stream` -- send one message and receive a stream of messages back.
* `Channel` -- send streams of messages in both directions.
* `Fire-and-Forget` -- send a one-way message.
Once the initial connection is made, the "client" vs "server" distinction is lost as
both sides become symmetrical and each side can initiate one of the above interactions.
This is why in the protocol calls the participating sides "requester" and "responder"
while the above interactions are called "requests" or "streams".
Below are key features built into the protocol:
* https://www.reactive-streams.org/[Reactive Streams] semantics across network boundary --
for streaming requests such as `Request-Stream` and `Channel`, back pressure signals
travel between requester and responder, allowing a requester to slow down a responder at
the source, hence reducing reliance on network layer congestion control, and the need
for buffering at the network or any level.
* Request throttling -- this feature is named "leasing" after the `LEASE` frame that
can be sent from each end to limit the total number of requests allowed by other end
for a given time. Leases are renewed periodically.
* Session resumption -- this is designed for loss of connectivity and requires some state
to be maintained. The state management is transparent for applications, and works well
in combination with back pressure which can stop a producer when possible and reduce
the amount of state required.
* Fragmentation and re-assembly of large messages.
* Keepalive (heartbeats).
RSocket has https://github.com/rsocket[implementations] in multiple languages. The
https://github.com/rsocket/rsocket-java[Java library] is built on
https://projectreactor.io/[Project Reactor], and Reactor Netty for the transport.
That means signals from Reactive Streams Publishers in your application propagate
transparently through RSocket across the network.
[[rsocket-protocol]]
=== The Protocol
One of the benefits of RSocket is that it has a well defined behavior on the wire and an
easy to read http://rsocket.io/docs/Protocol[specification] along with some protocol
https://github.com/rsocket/rsocket/tree/master/Extensions[extensions]. Therefore it is
a good idea to read the spec, independent of language implementations and higher level
framework APIs. This section provides a succinct overview to establish some context.
**Connecting**
Initially a client connects to a server via some low level streaming transport such
as TCP or WebSocket and sends a `SETUP` frame to the server to define parameters for the
connection.
The server may reject the `SETUP` frame, but generally after it is sent (for the client)
and received (for the server), both sides can begin to make requests, unless `SETUP`
indicates use of leasing semantics to limit the number of requests, in which case
both sides must wait for a `LEASE` frame from the other end to permit making requests.
**Making Requests**
Once a connection is established, both sides may initiate a request through one of the
frames `REQUEST_RESPONSE`, `REQUEST_STREAM`, `REQUEST_CHANNEL`, or `REQUEST_FNF`. Each of
those frames carries one message from the requester to the responder.
The responder may then return `PAYLOAD` frames with response messages, and in the case
of `REQUEST_CHANNEL` the requester may also send `PAYLOAD` frames with more request
messages.
When a request involves a stream of messages such as as `Request-Stream` and `Channel`,
the responder must respect demand signals from the requester. Demand is expressed as a
number of messages. Initial demand is specified in `REQUEST_STREAM` and
`REQUEST_CHANNEL` frames. Subsequent demand is signaled via `REQUEST_N` frames.
Each side may also send metadata notifications, via the `METADATA_PUSH` frame, that do not
pertain to any individual request but rather to the connection as a whole.
**Message Format**
RSocket messages contain data and metadata. Metadata can be used to send a route, a
security token, etc. Data and metadata can be formatted differently. Mime types for each
are declared in the `SETUP` frame and apply to all requests on a given connection.
While all messages can have metadata, typically metadata such as a route are per-request
and therefore only included in the first message on a request, i.e. with one of the frames
`REQUEST_RESPONSE`, `REQUEST_STREAM`, `REQUEST_CHANNEL`, or `REQUEST_FNF`.
Protocol extensions define common metadata formats for use in applications:
* https://github.com/rsocket/rsocket/blob/master/Extensions/CompositeMetadata.md[Composite Metadata]
-- multiple, independently formatted metadata entries.
* https://github.com/rsocket/rsocket/blob/master/Routing.md[Routing] -- the route for a
request.
[[rsocket-java]]
=== Java Implementation
The https://github.com/rsocket/rsocket-java[Java implementation] for RSocket is built on
https://projectreactor.io/[Project Reactor]. The transports for TCP and WebSocket are
built on https://github.com/reactor/reactor-netty[Reactor Netty]. As a Reactive Streams
library, Reactor simplifies the job of implementing the protocol. For applications it is
a natural fit to use `Flux` and `Mono` with declarative operators and transparent back
pressure support.
The API in RSocket Java is intentionally minimal and basic. It focuses on protocol
features and leaves the application programming model (e.g. RPC codegen vs other) as a
higher level, independent concern.
The main contract
https://github.com/rsocket/rsocket-java/blob/master/rsocket-core/src/main/java/io/rsocket/RSocket.java[io.rsocket.RSocket]
models the four request interaction types with `Mono` representing a promise for a
single message, `Flux` a stream of messages, and `io.rsocket.Payload` the actual
message with access to data and metadata as byte buffers. The `RSocket` contract is used
symmetrically. For requesting, the application is given an `RSocket` to perform
requests with. For responding, the application implements `RSocket` to handle requests.
This is not meant to be a thorough introduction. For the most part, Spring applications
will not have to use its API directly. However it may be important to see or experiment
with RSocket independent of Spring. The RSocket Java repository contains a number of
https://github.com/rsocket/rsocket-java/tree/develop/rsocket-examples[sample apps] that
demonstrate its API and protocol features.
[[rsocket-spring]]
=== Spring Support
The `spring-messaging` module contains the following:
* <<rsocket-requester>> -- fluent API to make requests through an `io.rsocket.RSocket`
with data and metadata encoding/decoding.
* <<rsocket-annot-responders>> -- `@MessageMapping` annotated handler methods for responding.
The `spring-web` module contains `Encoder` and `Decoder` implementations such as Jackson
CBOR/JSON, and Protobuf that RSocket applications will likely need. It also contains the
`PathPatternParser` that can be plugged in for efficient route matching.
Spring Security...
Spring Boot...
[[rsocket-requester]]
== RSocketRequester
`RSocketRequester` provides a fluent API to perform RSocket requests, accepting and
returning objects for data and metadata instead of low level data buffers. It can be used
symmetrically, to make requests from clients and to make requests from servers.
[[rsocket-requester-client]]
=== Client Requester
To obtain an `RSocketRequester` on the client side requires connecting to a server along with
preparing and sending the initial RSocket `SETUP` frame. `RSocketRequester` provides a
builder for that. Internally uses RSocket Java's `RSocketFactory`.
This is the most basic way to connect with default settings:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
Mono<RSocketRequester> mono = RSocketRequester.builder()
.connectTcp("localhost", 7000);
Mono<RSocketRequester> requesterMono = RSocketRequester.builder()
.connectWebSocket(URI.create("http://example.org:8080/rsocket"));
----
The above is deferred. To actually connect and use the requester:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
// Connect asynchronously
RSocketRequester.builder().connectTcp("localhost", 7000)
.subscribe(requester -> {
// ...
});
// Or block
RSocketRequester requester = RSocketRequester.builder()
.connectTcp("localhost", 7000)
.block(Duration.ofSeconds(5));
----
[[rsocket-requester-client-setup]]
==== Connection Setup
`RSocketRequester.Builder` provides the following to customize the initial `SETUP` frame:
* `dataMimeType(MimeType)` -- set the mime type for data on the connection.
* `metadataMimeType(MimeType)` -- set the mime type for metadata on the connection.
* `setupData(Object)` -- data to include in the `SETUP`.
* `setupRoute(String, Object...)` -- route in the metadata to include in the `SETUP`.
* `setupMetadata(Object, MimeType)` -- other metadata to include in the `SETUP`.
For data, the default mime type is derived from the first configured `Decoder`. For
metadata, the default mime type is composite metadata which allows multiple metadata
value and mime type pairs per request. Typically both don't need to be changed.
Data and metadata in the `SETUP` frame is optional. On the server side,
a `@ConnectMapping` methods can be used to handle the start of a connection and the
content of the `SETUP` frame. Metadata may include connection level security info.
[[rsocket-requester-client-strategies]]
==== Strategies
`RSocketRequester.Builder` accepts `RSocketStrategies` to configure the requester.
You'll need to use this to provide encoders and decoders for (de)-serialization of data and
metadata values. By default only the basic codecs from `spring-core` for `String`,
`byte[]`, and `ByteBuffer` are registered. Adding `spring-web` provides access to more that
can be registered as follows:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder))
.decoder(decoders -> decoders.add(new Jackson2CborDecoder))
.build();
RSocketRequester.builder()
.rsocketStrategies(strategies)
.connectTcp("localhost", 7000);
----
`RSocketStrategies` is designed for re-use. In some scenarios, e.g. client and server in
the same application, it may be preferable to declare it in Spring configuration.
[[rsocket-requester-client-responder]]
==== Client Responders
`RSocketRequester.Builder` can be used to configure responders to requests from the
server.
You can use annotated handlers for client-side responding based on the same
infrastructure that's used on a server, but registered programmatically as follows:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) <1>
.build();
ClientHandler handler = new ClientHandler(); <2>
RSocketRequester.builder()
.rsocketFactory(RSocketMessageHandler.clientResponder(strategies, handler)) <3>
.connectTcp("localhost", 7000);
----
<1> Use `PathPatternRouteMatcher`, if `spring-web` is present, for efficient
route matching.
<2> Create responder that contains `@MessageMaping` or `@ConnectMapping` methods.
<3> Use static factory method in `RSocketMessageHandler` to register one or more responders.
Note the above is only a shortcut designed for programmatic registration of client
responders. For alternative scenarios, where client responders are in Spring configuration,
you can still declare `RSocketMessageHandler` as a Spring bean and then apply as follows:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester.builder()
.rsocketFactory(factory -> factory.acceptor(handler.responder()))
.connectTcp("localhost", 7000);
----
For the above you may also need to use `setHandlerPredicate` in `RSocketMessageHandler` to
switch to a different strategy for detecting client responders, e.g. based on a custom
annotation such as `@RSocketClientResponder` vs the default `@Controller`. This
is necessary in scenarios with client and server, or multiple clients in the same
application.
See also <<rsocket-annot-responders>>, for more on the programming model.
[[rsocket-requester-client-advanced]]
==== Advanced
`RSocketRequesterBuilder` provides a callback to expose the underlying
`ClientRSocketFactory` from RSocket Java for further configuration options for
keepalive intervals, session resumption, interceptors, and more. You can configure options
at that level as follows:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
RSocketRequester.builder()
.rsocketFactory(factory -> {
// ...
})
.connectTcp("localhost", 7000);
----
[[rsocket-requester-server]]
=== Server Requester
To make requests from a server to connected clients is a matter of obtaining the
requester for the connected client from the server.
In <<rsocket-annot-responders>>, `@ConnectMapping` and `@MessageMapping` methods support an
`RSocketRequester` argument. Use it to access the requester for the connection. Keep in
mind that `@ConnectMapping` methods are essentially handlers of the `SETUP` frame which
must be handled before requests can begin. Therefore, requests at the very start must be
decoupled from handling. For example:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { <1>
// ...
});
return ... <2>
}
----
<1> Start the request asynchronously, independent from handling.
<2> Perform handling and return completion `Mono<Void>`.
[[rsocket-requester-requests]]
=== Requests
Once you have a <<rsocket-requester-client,client>> or
<<rsocket-requester-server,server>> requester, you can make requests as follows:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
ViewBox box = ... ;
Flux<AirportLocation> locations =
requester.route("locate.radars.within") <1>
.data(viewBox) <2>
.retrieveFlux(AirportLocation.class); <3>
----
<1> Specify a route to include in the metadata of the request message.
<2> Provide data for the request message.
<3> Declare the expected response.
The interaction type is determined implicitly from the cardinality of the input and
output. The above example is a `Request-Stream` because one value is sent and a stream
of values is received. For the most part you don't need to think about this as long as the
choice of input and output matches an RSocket interaction type and the types of input and
output expected by the responder. The only example of an invalid combination is many-to-one.
The `data(Object)` method also accepts any Reactive Streams `Publisher`, including
`Flux` and `Mono`, as well as any other producer of value(s) that is registered in the
`ReactiveAdapterRegistry`. For a multi-value `Publisher` such as `Flux` which produces the
same types of values, consider using one of the overloaded `data` methods to avoid having
type checks and `Encoder` lookup on every element:
[source,java,indent=0]
[subs="verbatim,quotes"]
----
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
----
The `data(Object)` step is optional. Skip it for requests that don't send data:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
Mono<AirportLocation> location =
requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
----
Extra metadata values can be added if using composite metadata (the default) and if the
values are supported by a registered `Encoder`. For example:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
String securityToken = ... ;
ViewBox box = ... ;
Flux<AirportLocation> locations =
requester.route("locate.radars.within")
.metadata(securityToken, "message/x.rsocket.authentication.bearer.v0")
.data(viewBox)
.retrieveFlux(AirportLocation.class);
----
For `Fire-and-Forget` use the `send()` method that returns `Mono<Void>`. Note that the `Mono`
indicates only that the message was successfully sent, and not that it was handled.
[[rsocket-annot-responders]]
== Annotated Responders
RSocket responders can be implemented as `@MessageMapping` and `@ConnectMapping` methods.
`@MessageMapping` methods handle individual requests, and `@ConnectMapping` methods handle
connection-level events (setup and metadata push). Annotated responders are supported
symmetrically, for responding from the server side and for responding from the client side.
[[rsocket-annot-responders-server]]
=== Server Responders
To use annotated responders on the server side, add `RSocketMessageHandler` to your Spring
configuration to detect `@Controller` beans with `@MessageMapping` and `@ConnectMapping`
methods:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
----
Then start an RSocket server through the Java RSocket API and plug the
`RSocketMessageHandler` for the responder as follows:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketFactory.receive()
.acceptor(handler.responder())
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.block();
----
`RSocketMessageHandler` supports the composite metadata and the routing metadata formats
by default. It can be configured with the <<rsocket-metadata-extractor>> to use if you
need to change that or register additional metadata mime types.
You'll need to set the `Encoder` and `Decoder` instances required for metadata and data
formats to support. You'll likely need the `spring-web` module for codec implementations.
By default `SimpleRouteMatcher` is used for matching routes via `AntPathMatcher`.
We recommend plugging in the `PathPatternRouteMatcher` from `spring-web` for
efficient route matching. RSocket routes can be hierarchical but are not URL paths.
Both route matchers are configured to use "." as separator by default and there is no URL
decoding as with HTTP URLs.
`RSocketMessageHandler` can be configured via `RSocketStrategies` which may be useful if
you need to share configuration between a client and a server in the same process:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
retrun RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder))
.decoder(decoders -> decoders.add(new Jackson2CborDecoder))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
----
[[rsocket-annot-responders-client]]
=== Client Responders
Annotated responders on the client side need to be configured in the
`RSocketRequester.Builder`. For details, see
<<rsocket-requester-client-responder>>.
[[rsocket-annot-messagemapping]]
=== @MessageMapping
Once <<rsocket-annot-responders-server,server>> or
<<rsocket-annot-responders-client,client>> responder configuration is in place,
`@MessageMapping` methods can be used as follows:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
----
You don't need to explicit specify the RSocket interaction type. Simply declare the
expected input and output, and a route pattern. The supporting infrastructure will adapt
matching requests.
The following additional arguments are supported for `@MessageMapping` methods:
* `RSocketRequester` -- the requester for the connection associated with the request,
to make requests to the remote end.
* `@DestinationVariable` -- the value for a variable from the pattern, e.g.
`@MessageMapping("find.radar.{id}")`.
* `@Header` -- access to a metadata value registered for extraction, as described in
<<rsocket-metadata-extractor>>.
* `@Headers Map<String, Object>` -- access to all metadata values registered for
extraction, as described in <<rsocket-metadata-extractor>>.
[[rsocket-annot-connectmapping]]
=== @ConnectMapping
`@ConnectMapping` handles the `SETUP` frame at the start of an RSocket connection.
It can be mapped with a pattern, like an `@MessageMapping` method, and it supports the
same arguments as an `@MessageMapping` method but based on the content of the `SETUP`
frame.
`@ConnectMapping` methods also handle metadata push notifications through
the `METADATA_PUSH` frame, i.e. the `metadataPush(Payload)` in `io.rsocket.RSocket`.
[[rsocket-metadata-extractor]]
== MetadataExtractor
Responders must interpret metadata.
https://github.com/rsocket/rsocket/blob/master/Extensions/CompositeMetadata.md[Composite metadata]
allows independently formatted metadata values (e.g. for routing, security, tracing) each
with its own mime type. Applications need a way to configure metadata mime types to
support, and a way to access extracted values.
`MetadataExtractor` is a contract to take serialized metadata and return decoded
name-value pairs that can then be accessed like headers by name, for example via `@Header`
in annotated handler methods.
`DefaultMetadataExtractor` can be given `Decoder` instances to decode metadata. Out of
the box it has built-in support for routing metadata ("message/x.rsocket.routing.v0"),
which it decodes to `String` and saves under the "route" key. For any other mime type
you'll need to provide a `Decoder` and register the mime type as follows:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
----
Composite metadata works well to combine independent metadata values. However the
requester might not support composite metadata, or may choose not to use it. For this,
`DefaultMetadataExtractor` may needs custom logic to map the decoded value to the output
map. Here is an example where JSON is used for metadata:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
----
When configuring `MetadataExtractor` through `RSocketStrategies`, you can let
`RSocketStrategies.Builder` create the extractor with the configured decoders, and
simply use a callback to customize registrations as follows:
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
.Java
----
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
----

8
src/docs/asciidoc/web-reactive.adoc

@ -23,7 +23,6 @@ include::web/webflux-websocket.adoc[leveloffset=+1] @@ -23,7 +23,6 @@ include::web/webflux-websocket.adoc[leveloffset=+1]
[[webflux-test]]
== Testing
[.small]#<<web.adoc#testing, Same in Spring MVC>>#
@ -39,12 +38,7 @@ server. You can use the `WebTestClient` for end-to-end integration tests, too. @@ -39,12 +38,7 @@ server. You can use the `WebTestClient` for end-to-end integration tests, too.
//[[webflux-threading-model]]
//=== Threading model
// TODO Once we have content for this heading, we should un-comment the heading and
// anchor. Until then, we should leave it commented.
include::rsocket.adoc[leveloffset=+1]

Loading…
Cancel
Save