diff --git a/src/docs/asciidoc/rsocket.adoc b/src/docs/asciidoc/rsocket.adoc new file mode 100644 index 0000000000..844936c229 --- /dev/null +++ b/src/docs/asciidoc/rsocket.adoc @@ -0,0 +1,628 @@ +[[rsocket]] += RSocket + + + + +[[rsocket-overview]] +== Overview + +RSocket is an application protocol for multiplexed, duplex communication over TCP, +WebSocket, and other byte stream transports, using one of the following interaction +models: + +* `Request-Response` -- send one message and receive one back. +* `Request-Stream` -- send one message and receive a stream of messages back. +* `Channel` -- send streams of messages in both directions. +* `Fire-and-Forget` -- send a one-way message. + +Once the initial connection is made, the "client" vs "server" distinction is lost as +both sides become symmetrical and each side can initiate one of the above interactions. +This is why in the protocol calls the participating sides "requester" and "responder" +while the above interactions are called "requests" or "streams". + +Below are key features built into the protocol: + +* https://www.reactive-streams.org/[Reactive Streams] semantics across network boundary -- +for streaming requests such as `Request-Stream` and `Channel`, back pressure signals +travel between requester and responder, allowing a requester to slow down a responder at +the source, hence reducing reliance on network layer congestion control, and the need +for buffering at the network or any level. +* Request throttling -- this feature is named "leasing" after the `LEASE` frame that +can be sent from each end to limit the total number of requests allowed by other end +for a given time. Leases are renewed periodically. +* Session resumption -- this is designed for loss of connectivity and requires some state +to be maintained. The state management is transparent for applications, and works well +in combination with back pressure which can stop a producer when possible and reduce +the amount of state required. +* Fragmentation and re-assembly of large messages. +* Keepalive (heartbeats). + +RSocket has https://github.com/rsocket[implementations] in multiple languages. The +https://github.com/rsocket/rsocket-java[Java library] is built on +https://projectreactor.io/[Project Reactor], and Reactor Netty for the transport. +That means signals from Reactive Streams Publishers in your application propagate +transparently through RSocket across the network. + + + +[[rsocket-protocol]] +=== The Protocol + +One of the benefits of RSocket is that it has a well defined behavior on the wire and an +easy to read http://rsocket.io/docs/Protocol[specification] along with some protocol +https://github.com/rsocket/rsocket/tree/master/Extensions[extensions]. Therefore it is +a good idea to read the spec, independent of language implementations and higher level +framework APIs. This section provides a succinct overview to establish some context. + +**Connecting** + +Initially a client connects to a server via some low level streaming transport such +as TCP or WebSocket and sends a `SETUP` frame to the server to define parameters for the +connection. + +The server may reject the `SETUP` frame, but generally after it is sent (for the client) +and received (for the server), both sides can begin to make requests, unless `SETUP` +indicates use of leasing semantics to limit the number of requests, in which case +both sides must wait for a `LEASE` frame from the other end to permit making requests. + +**Making Requests** + +Once a connection is established, both sides may initiate a request through one of the +frames `REQUEST_RESPONSE`, `REQUEST_STREAM`, `REQUEST_CHANNEL`, or `REQUEST_FNF`. Each of +those frames carries one message from the requester to the responder. + +The responder may then return `PAYLOAD` frames with response messages, and in the case +of `REQUEST_CHANNEL` the requester may also send `PAYLOAD` frames with more request +messages. + +When a request involves a stream of messages such as as `Request-Stream` and `Channel`, +the responder must respect demand signals from the requester. Demand is expressed as a +number of messages. Initial demand is specified in `REQUEST_STREAM` and +`REQUEST_CHANNEL` frames. Subsequent demand is signaled via `REQUEST_N` frames. + +Each side may also send metadata notifications, via the `METADATA_PUSH` frame, that do not +pertain to any individual request but rather to the connection as a whole. + +**Message Format** + +RSocket messages contain data and metadata. Metadata can be used to send a route, a +security token, etc. Data and metadata can be formatted differently. Mime types for each +are declared in the `SETUP` frame and apply to all requests on a given connection. + +While all messages can have metadata, typically metadata such as a route are per-request +and therefore only included in the first message on a request, i.e. with one of the frames +`REQUEST_RESPONSE`, `REQUEST_STREAM`, `REQUEST_CHANNEL`, or `REQUEST_FNF`. + +Protocol extensions define common metadata formats for use in applications: + +* https://github.com/rsocket/rsocket/blob/master/Extensions/CompositeMetadata.md[Composite Metadata] +-- multiple, independently formatted metadata entries. +* https://github.com/rsocket/rsocket/blob/master/Routing.md[Routing] -- the route for a +request. + + + +[[rsocket-java]] +=== Java Implementation + +The https://github.com/rsocket/rsocket-java[Java implementation] for RSocket is built on +https://projectreactor.io/[Project Reactor]. The transports for TCP and WebSocket are +built on https://github.com/reactor/reactor-netty[Reactor Netty]. As a Reactive Streams +library, Reactor simplifies the job of implementing the protocol. For applications it is +a natural fit to use `Flux` and `Mono` with declarative operators and transparent back +pressure support. + +The API in RSocket Java is intentionally minimal and basic. It focuses on protocol +features and leaves the application programming model (e.g. RPC codegen vs other) as a +higher level, independent concern. + +The main contract +https://github.com/rsocket/rsocket-java/blob/master/rsocket-core/src/main/java/io/rsocket/RSocket.java[io.rsocket.RSocket] +models the four request interaction types with `Mono` representing a promise for a +single message, `Flux` a stream of messages, and `io.rsocket.Payload` the actual +message with access to data and metadata as byte buffers. The `RSocket` contract is used +symmetrically. For requesting, the application is given an `RSocket` to perform +requests with. For responding, the application implements `RSocket` to handle requests. + +This is not meant to be a thorough introduction. For the most part, Spring applications +will not have to use its API directly. However it may be important to see or experiment +with RSocket independent of Spring. The RSocket Java repository contains a number of +https://github.com/rsocket/rsocket-java/tree/develop/rsocket-examples[sample apps] that +demonstrate its API and protocol features. + + + +[[rsocket-spring]] +=== Spring Support + +The `spring-messaging` module contains the following: + +* <> -- fluent API to make requests through an `io.rsocket.RSocket` +with data and metadata encoding/decoding. +* <> -- `@MessageMapping` annotated handler methods for responding. + +The `spring-web` module contains `Encoder` and `Decoder` implementations such as Jackson +CBOR/JSON, and Protobuf that RSocket applications will likely need. It also contains the +`PathPatternParser` that can be plugged in for efficient route matching. + +Spring Security... + +Spring Boot... + + + + +[[rsocket-requester]] +== RSocketRequester + +`RSocketRequester` provides a fluent API to perform RSocket requests, accepting and +returning objects for data and metadata instead of low level data buffers. It can be used +symmetrically, to make requests from clients and to make requests from servers. + + +[[rsocket-requester-client]] +=== Client Requester + +To obtain an `RSocketRequester` on the client side requires connecting to a server along with +preparing and sending the initial RSocket `SETUP` frame. `RSocketRequester` provides a +builder for that. Internally uses RSocket Java's `RSocketFactory`. + +This is the most basic way to connect with default settings: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + Mono mono = RSocketRequester.builder() + .connectTcp("localhost", 7000); + + Mono requesterMono = RSocketRequester.builder() + .connectWebSocket(URI.create("http://example.org:8080/rsocket")); +---- + +The above is deferred. To actually connect and use the requester: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + // Connect asynchronously + RSocketRequester.builder().connectTcp("localhost", 7000) + .subscribe(requester -> { + // ... + }); + + // Or block + RSocketRequester requester = RSocketRequester.builder() + .connectTcp("localhost", 7000) + .block(Duration.ofSeconds(5)); +---- + + +[[rsocket-requester-client-setup]] +==== Connection Setup + +`RSocketRequester.Builder` provides the following to customize the initial `SETUP` frame: + +* `dataMimeType(MimeType)` -- set the mime type for data on the connection. +* `metadataMimeType(MimeType)` -- set the mime type for metadata on the connection. +* `setupData(Object)` -- data to include in the `SETUP`. +* `setupRoute(String, Object...)` -- route in the metadata to include in the `SETUP`. +* `setupMetadata(Object, MimeType)` -- other metadata to include in the `SETUP`. + +For data, the default mime type is derived from the first configured `Decoder`. For +metadata, the default mime type is composite metadata which allows multiple metadata +value and mime type pairs per request. Typically both don't need to be changed. + +Data and metadata in the `SETUP` frame is optional. On the server side, +a `@ConnectMapping` methods can be used to handle the start of a connection and the +content of the `SETUP` frame. Metadata may include connection level security info. + + +[[rsocket-requester-client-strategies]] +==== Strategies + +`RSocketRequester.Builder` accepts `RSocketStrategies` to configure the requester. +You'll need to use this to provide encoders and decoders for (de)-serialization of data and +metadata values. By default only the basic codecs from `spring-core` for `String`, +`byte[]`, and `ByteBuffer` are registered. Adding `spring-web` provides access to more that +can be registered as follows: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + RSocketStrategies strategies = RSocketStrategies.builder() + .encoders(encoders -> encoders.add(new Jackson2CborEncoder)) + .decoder(decoders -> decoders.add(new Jackson2CborDecoder)) + .build(); + + RSocketRequester.builder() + .rsocketStrategies(strategies) + .connectTcp("localhost", 7000); +---- + +`RSocketStrategies` is designed for re-use. In some scenarios, e.g. client and server in +the same application, it may be preferable to declare it in Spring configuration. + + +[[rsocket-requester-client-responder]] +==== Client Responders + +`RSocketRequester.Builder` can be used to configure responders to requests from the +server. + +You can use annotated handlers for client-side responding based on the same +infrastructure that's used on a server, but registered programmatically as follows: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + RSocketStrategies strategies = RSocketStrategies.builder() + .routeMatcher(new PathPatternRouteMatcher()) <1> + .build(); + + ClientHandler handler = new ClientHandler(); <2> + + RSocketRequester.builder() + .rsocketFactory(RSocketMessageHandler.clientResponder(strategies, handler)) <3> + .connectTcp("localhost", 7000); +---- + +<1> Use `PathPatternRouteMatcher`, if `spring-web` is present, for efficient + route matching. +<2> Create responder that contains `@MessageMaping` or `@ConnectMapping` methods. +<3> Use static factory method in `RSocketMessageHandler` to register one or more responders. + +Note the above is only a shortcut designed for programmatic registration of client +responders. For alternative scenarios, where client responders are in Spring configuration, +you can still declare `RSocketMessageHandler` as a Spring bean and then apply as follows: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + ApplicationContext context = ... ; + RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class); + + RSocketRequester.builder() + .rsocketFactory(factory -> factory.acceptor(handler.responder())) + .connectTcp("localhost", 7000); +---- + +For the above you may also need to use `setHandlerPredicate` in `RSocketMessageHandler` to +switch to a different strategy for detecting client responders, e.g. based on a custom +annotation such as `@RSocketClientResponder` vs the default `@Controller`. This +is necessary in scenarios with client and server, or multiple clients in the same +application. + +See also <>, for more on the programming model. + + +[[rsocket-requester-client-advanced]] +==== Advanced + +`RSocketRequesterBuilder` provides a callback to expose the underlying +`ClientRSocketFactory` from RSocket Java for further configuration options for +keepalive intervals, session resumption, interceptors, and more. You can configure options +at that level as follows: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + RSocketRequester.builder() + .rsocketFactory(factory -> { + // ... + }) + .connectTcp("localhost", 7000); +---- + + +[[rsocket-requester-server]] +=== Server Requester + +To make requests from a server to connected clients is a matter of obtaining the +requester for the connected client from the server. + +In <>, `@ConnectMapping` and `@MessageMapping` methods support an +`RSocketRequester` argument. Use it to access the requester for the connection. Keep in +mind that `@ConnectMapping` methods are essentially handlers of the `SETUP` frame which +must be handled before requests can begin. Therefore, requests at the very start must be +decoupled from handling. For example: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + @ConnectMapping + Mono handle(RSocketRequester requester) { + requester.route("status").data("5") + .retrieveFlux(StatusReport.class) + .subscribe(bar -> { <1> + // ... + }); + return ... <2> + } +---- + +<1> Start the request asynchronously, independent from handling. +<2> Perform handling and return completion `Mono`. + + + +[[rsocket-requester-requests]] +=== Requests + +Once you have a <> or +<> requester, you can make requests as follows: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + ViewBox box = ... ; + + Flux locations = + requester.route("locate.radars.within") <1> + .data(viewBox) <2> + .retrieveFlux(AirportLocation.class); <3> + +---- + +<1> Specify a route to include in the metadata of the request message. +<2> Provide data for the request message. +<3> Declare the expected response. + +The interaction type is determined implicitly from the cardinality of the input and +output. The above example is a `Request-Stream` because one value is sent and a stream +of values is received. For the most part you don't need to think about this as long as the +choice of input and output matches an RSocket interaction type and the types of input and +output expected by the responder. The only example of an invalid combination is many-to-one. + +The `data(Object)` method also accepts any Reactive Streams `Publisher`, including +`Flux` and `Mono`, as well as any other producer of value(s) that is registered in the +`ReactiveAdapterRegistry`. For a multi-value `Publisher` such as `Flux` which produces the +same types of values, consider using one of the overloaded `data` methods to avoid having +type checks and `Encoder` lookup on every element: + +[source,java,indent=0] +[subs="verbatim,quotes"] +---- +data(Object producer, Class elementClass); +data(Object producer, ParameterizedTypeReference elementTypeRef); +---- + +The `data(Object)` step is optional. Skip it for requests that don't send data: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + + Mono location = + requester.route("find.radar.EWR")) + .retrieveMono(AirportLocation.class); +---- + +Extra metadata values can be added if using composite metadata (the default) and if the +values are supported by a registered `Encoder`. For example: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + String securityToken = ... ; + ViewBox box = ... ; + + Flux locations = + requester.route("locate.radars.within") + .metadata(securityToken, "message/x.rsocket.authentication.bearer.v0") + .data(viewBox) + .retrieveFlux(AirportLocation.class); + +---- + +For `Fire-and-Forget` use the `send()` method that returns `Mono`. Note that the `Mono` +indicates only that the message was successfully sent, and not that it was handled. + + + +[[rsocket-annot-responders]] +== Annotated Responders + +RSocket responders can be implemented as `@MessageMapping` and `@ConnectMapping` methods. +`@MessageMapping` methods handle individual requests, and `@ConnectMapping` methods handle +connection-level events (setup and metadata push). Annotated responders are supported +symmetrically, for responding from the server side and for responding from the client side. + + + +[[rsocket-annot-responders-server]] +=== Server Responders + +To use annotated responders on the server side, add `RSocketMessageHandler` to your Spring +configuration to detect `@Controller` beans with `@MessageMapping` and `@ConnectMapping` +methods: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + @Configuration + static class ServerConfig { + + @Bean + public RSocketMessageHandler rsocketMessageHandler() { + RSocketMessageHandler handler = new RSocketMessageHandler(); + handler.routeMatcher(new PathPatternRouteMatcher()); + return handler; + } + } +---- + +Then start an RSocket server through the Java RSocket API and plug the +`RSocketMessageHandler` for the responder as follows: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + ApplicationContext context = ... ; + RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class); + + CloseableChannel server = + RSocketFactory.receive() + .acceptor(handler.responder()) + .transport(TcpServerTransport.create("localhost", 7000)) + .start() + .block(); +---- + +`RSocketMessageHandler` supports the composite metadata and the routing metadata formats +by default. It can be configured with the <> to use if you +need to change that or register additional metadata mime types. + +You'll need to set the `Encoder` and `Decoder` instances required for metadata and data +formats to support. You'll likely need the `spring-web` module for codec implementations. + +By default `SimpleRouteMatcher` is used for matching routes via `AntPathMatcher`. +We recommend plugging in the `PathPatternRouteMatcher` from `spring-web` for +efficient route matching. RSocket routes can be hierarchical but are not URL paths. +Both route matchers are configured to use "." as separator by default and there is no URL +decoding as with HTTP URLs. + +`RSocketMessageHandler` can be configured via `RSocketStrategies` which may be useful if +you need to share configuration between a client and a server in the same process: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + @Configuration + static class ServerConfig { + + @Bean + public RSocketMessageHandler rsocketMessageHandler() { + RSocketMessageHandler handler = new RSocketMessageHandler(); + handler.setRSocketStrategies(rsocketStrategies()); + return handler; + } + + @Bean + public RSocketStrategies rsocketStrategies() { + retrun RSocketStrategies.builder() + .encoders(encoders -> encoders.add(new Jackson2CborEncoder)) + .decoder(decoders -> decoders.add(new Jackson2CborDecoder)) + .routeMatcher(new PathPatternRouteMatcher()) + .build(); + } + } +---- + + + +[[rsocket-annot-responders-client]] +=== Client Responders + +Annotated responders on the client side need to be configured in the +`RSocketRequester.Builder`. For details, see +<>. + + + +[[rsocket-annot-messagemapping]] +=== @MessageMapping + +Once <> or +<> responder configuration is in place, +`@MessageMapping` methods can be used as follows: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + @Controller + public class RadarsController { + + @MessageMapping("locate.radars.within") + public Flux radars(MapRequest request) { + // ... + } + } +---- + +You don't need to explicit specify the RSocket interaction type. Simply declare the +expected input and output, and a route pattern. The supporting infrastructure will adapt +matching requests. + +The following additional arguments are supported for `@MessageMapping` methods: + +* `RSocketRequester` -- the requester for the connection associated with the request, + to make requests to the remote end. +* `@DestinationVariable` -- the value for a variable from the pattern, e.g. + `@MessageMapping("find.radar.{id}")`. +* `@Header` -- access to a metadata value registered for extraction, as described in + <>. +* `@Headers Map` -- access to all metadata values registered for + extraction, as described in <>. + + + +[[rsocket-annot-connectmapping]] +=== @ConnectMapping + +`@ConnectMapping` handles the `SETUP` frame at the start of an RSocket connection. +It can be mapped with a pattern, like an `@MessageMapping` method, and it supports the +same arguments as an `@MessageMapping` method but based on the content of the `SETUP` +frame. + +`@ConnectMapping` methods also handle metadata push notifications through +the `METADATA_PUSH` frame, i.e. the `metadataPush(Payload)` in `io.rsocket.RSocket`. + + + +[[rsocket-metadata-extractor]] +== MetadataExtractor + +Responders must interpret metadata. +https://github.com/rsocket/rsocket/blob/master/Extensions/CompositeMetadata.md[Composite metadata] +allows independently formatted metadata values (e.g. for routing, security, tracing) each +with its own mime type. Applications need a way to configure metadata mime types to +support, and a way to access extracted values. + +`MetadataExtractor` is a contract to take serialized metadata and return decoded +name-value pairs that can then be accessed like headers by name, for example via `@Header` +in annotated handler methods. + +`DefaultMetadataExtractor` can be given `Decoder` instances to decode metadata. Out of +the box it has built-in support for routing metadata ("message/x.rsocket.routing.v0"), +which it decodes to `String` and saves under the "route" key. For any other mime type +you'll need to provide a `Decoder` and register the mime type as follows: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders); + extractor.metadataToExtract(fooMimeType, Foo.class, "foo"); +---- + +Composite metadata works well to combine independent metadata values. However the +requester might not support composite metadata, or may choose not to use it. For this, +`DefaultMetadataExtractor` may needs custom logic to map the decoded value to the output +map. Here is an example where JSON is used for metadata: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders); + extractor.metadataToExtract( + MimeType.valueOf("application/vnd.myapp.metadata+json"), + new ParameterizedTypeReference>() {}, + (jsonMap, outputMap) -> { + outputMap.putAll(jsonMap); + }); +---- + +When configuring `MetadataExtractor` through `RSocketStrategies`, you can let +`RSocketStrategies.Builder` create the extractor with the configured decoders, and +simply use a callback to customize registrations as follows: + +[source,java,indent=0,subs="verbatim,quotes",role="primary"] +.Java +---- + RSocketStrategies strategies = RSocketStrategies.builder() + .metadataExtractorRegistry(registry -> { + registry.metadataToExtract(fooMimeType, Foo.class, "foo"); + // ... + }) + .build(); +---- diff --git a/src/docs/asciidoc/web-reactive.adoc b/src/docs/asciidoc/web-reactive.adoc index 54f643671b..3e7df852a8 100644 --- a/src/docs/asciidoc/web-reactive.adoc +++ b/src/docs/asciidoc/web-reactive.adoc @@ -23,7 +23,6 @@ include::web/webflux-websocket.adoc[leveloffset=+1] - [[webflux-test]] == Testing [.small]#<># @@ -39,12 +38,7 @@ server. You can use the `WebTestClient` for end-to-end integration tests, too. - -//[[webflux-threading-model]] -//=== Threading model -// TODO Once we have content for this heading, we should un-comment the heading and -// anchor. Until then, we should leave it commented. - +include::rsocket.adoc[leveloffset=+1]