You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
503 lines
14 KiB
503 lines
14 KiB
[[webflux-websocket]] |
|
= WebSockets |
|
[.small]#xref:web/websocket.adoc[See equivalent in the Servlet stack]# |
|
|
|
This part of the reference documentation covers support for reactive-stack WebSocket |
|
messaging. |
|
|
|
include::partial$web/websocket-intro.adoc[leveloffset=+1] |
|
|
|
[[webflux-websocket-server]] |
|
== WebSocket API |
|
[.small]#xref:web/websocket/stomp/server-config.adoc[See equivalent in the Servlet stack]# |
|
|
|
The Spring Framework provides a WebSocket API that you can use to write client- and |
|
server-side applications that handle WebSocket messages. |
|
|
|
|
|
|
|
[[webflux-websocket-server-handler]] |
|
=== Server |
|
[.small]#xref:web/websocket/server.adoc#websocket-server-handler[See equivalent in the Servlet stack]# |
|
|
|
To create a WebSocket server, you can first create a `WebSocketHandler`. |
|
The following example shows how to do so: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
import org.springframework.web.reactive.socket.WebSocketHandler; |
|
import org.springframework.web.reactive.socket.WebSocketSession; |
|
|
|
public class MyWebSocketHandler implements WebSocketHandler { |
|
|
|
@Override |
|
public Mono<Void> handle(WebSocketSession session) { |
|
// ... |
|
} |
|
} |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
import org.springframework.web.reactive.socket.WebSocketHandler |
|
import org.springframework.web.reactive.socket.WebSocketSession |
|
|
|
class MyWebSocketHandler : WebSocketHandler { |
|
|
|
override fun handle(session: WebSocketSession): Mono<Void> { |
|
// ... |
|
} |
|
} |
|
---- |
|
====== |
|
|
|
Then you can map it to a URL: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
@Configuration |
|
class WebConfig { |
|
|
|
@Bean |
|
public HandlerMapping handlerMapping() { |
|
Map<String, WebSocketHandler> map = new HashMap<>(); |
|
map.put("/path", new MyWebSocketHandler()); |
|
int order = -1; // before annotated controllers |
|
|
|
return new SimpleUrlHandlerMapping(map, order); |
|
} |
|
} |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
@Configuration |
|
class WebConfig { |
|
|
|
@Bean |
|
fun handlerMapping(): HandlerMapping { |
|
val map = mapOf("/path" to MyWebSocketHandler()) |
|
val order = -1 // before annotated controllers |
|
|
|
return SimpleUrlHandlerMapping(map, order) |
|
} |
|
} |
|
---- |
|
====== |
|
|
|
If using the xref:web/webflux/dispatcher-handler.adoc#webflux-framework-config[WebFlux Config] there is nothing |
|
further to do, or otherwise if not using the WebFlux config you'll need to declare a |
|
`WebSocketHandlerAdapter` as shown below: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
@Configuration |
|
class WebConfig { |
|
|
|
// ... |
|
|
|
@Bean |
|
public WebSocketHandlerAdapter handlerAdapter() { |
|
return new WebSocketHandlerAdapter(); |
|
} |
|
} |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
@Configuration |
|
class WebConfig { |
|
|
|
// ... |
|
|
|
@Bean |
|
fun handlerAdapter() = WebSocketHandlerAdapter() |
|
} |
|
---- |
|
====== |
|
|
|
|
|
|
|
[[webflux-websockethandler]] |
|
=== `WebSocketHandler` |
|
|
|
The `handle` method of `WebSocketHandler` takes `WebSocketSession` and returns `Mono<Void>` |
|
to indicate when application handling of the session is complete. The session is handled |
|
through two streams, one for inbound and one for outbound messages. The following table |
|
describes the two methods that handle the streams: |
|
|
|
[options="header"] |
|
|=== |
|
| `WebSocketSession` method | Description |
|
|
|
| `Flux<WebSocketMessage> receive()` |
|
| Provides access to the inbound message stream and completes when the connection is closed. |
|
|
|
| `Mono<Void> send(Publisher<WebSocketMessage>)` |
|
| Takes a source for outgoing messages, writes the messages, and returns a `Mono<Void>` that |
|
completes when the source completes and writing is done. |
|
|
|
|=== |
|
|
|
A `WebSocketHandler` must compose the inbound and outbound streams into a unified flow and |
|
return a `Mono<Void>` that reflects the completion of that flow. Depending on application |
|
requirements, the unified flow completes when: |
|
|
|
* Either the inbound or the outbound message stream completes. |
|
* The inbound stream completes (that is, the connection closed), while the outbound stream is infinite. |
|
* At a chosen point, through the `close` method of `WebSocketSession`. |
|
|
|
When inbound and outbound message streams are composed together, there is no need to |
|
check if the connection is open, since Reactive Streams signals end activity. |
|
The inbound stream receives a completion or error signal, and the outbound stream |
|
receives a cancellation signal. |
|
|
|
The most basic implementation of a handler is one that handles the inbound stream. The |
|
following example shows such an implementation: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
class ExampleHandler implements WebSocketHandler { |
|
|
|
@Override |
|
public Mono<Void> handle(WebSocketSession session) { |
|
return session.receive() // <1> |
|
.doOnNext(message -> { |
|
// ... // <2> |
|
}) |
|
.concatMap(message -> { |
|
// ... // <3> |
|
}) |
|
.then(); // <4> |
|
} |
|
} |
|
---- |
|
<1> Access the stream of inbound messages. |
|
<2> Do something with each message. |
|
<3> Perform nested asynchronous operations that use the message content. |
|
<4> Return a `Mono<Void>` that completes when receiving completes. |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
class ExampleHandler : WebSocketHandler { |
|
|
|
override fun handle(session: WebSocketSession): Mono<Void> { |
|
return session.receive() // <1> |
|
.doOnNext { |
|
// ... // <2> |
|
} |
|
.concatMap { |
|
// ... // <3> |
|
} |
|
.then() // <4> |
|
} |
|
} |
|
---- |
|
<1> Access the stream of inbound messages. |
|
<2> Do something with each message. |
|
<3> Perform nested asynchronous operations that use the message content. |
|
<4> Return a `Mono<Void>` that completes when receiving completes. |
|
====== |
|
|
|
|
|
TIP: For nested, asynchronous operations, you may need to call `message.retain()` on underlying |
|
servers that use pooled data buffers (for example, Netty). Otherwise, the data buffer may be |
|
released before you have had a chance to read the data. For more background, see |
|
xref:core/databuffer-codec.adoc[Data Buffers and Codecs]. |
|
|
|
The following implementation combines the inbound and outbound streams: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
class ExampleHandler implements WebSocketHandler { |
|
|
|
@Override |
|
public Mono<Void> handle(WebSocketSession session) { |
|
|
|
Flux<WebSocketMessage> output = session.receive() // <1> |
|
.doOnNext(message -> { |
|
// ... |
|
}) |
|
.concatMap(message -> { |
|
// ... |
|
}) |
|
.map(value -> session.textMessage("Echo " + value)); // <2> |
|
|
|
return session.send(output); // <3> |
|
} |
|
} |
|
---- |
|
<1> Handle the inbound message stream. |
|
<2> Create the outbound message, producing a combined flow. |
|
<3> Return a `Mono<Void>` that does not complete while we continue to receive. |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
class ExampleHandler : WebSocketHandler { |
|
|
|
override fun handle(session: WebSocketSession): Mono<Void> { |
|
|
|
val output = session.receive() // <1> |
|
.doOnNext { |
|
// ... |
|
} |
|
.concatMap { |
|
// ... |
|
} |
|
.map { session.textMessage("Echo $it") } // <2> |
|
|
|
return session.send(output) // <3> |
|
} |
|
} |
|
---- |
|
<1> Handle the inbound message stream. |
|
<2> Create the outbound message, producing a combined flow. |
|
<3> Return a `Mono<Void>` that does not complete while we continue to receive. |
|
====== |
|
|
|
|
|
Inbound and outbound streams can be independent and be joined only for completion, |
|
as the following example shows: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
class ExampleHandler implements WebSocketHandler { |
|
|
|
@Override |
|
public Mono<Void> handle(WebSocketSession session) { |
|
|
|
Mono<Void> input = session.receive() <1> |
|
.doOnNext(message -> { |
|
// ... |
|
}) |
|
.concatMap(message -> { |
|
// ... |
|
}) |
|
.then(); |
|
|
|
Flux<String> source = ... ; |
|
Mono<Void> output = session.send(source.map(session::textMessage)); <2> |
|
|
|
return Mono.zip(input, output).then(); <3> |
|
} |
|
} |
|
---- |
|
<1> Handle inbound message stream. |
|
<2> Send outgoing messages. |
|
<3> Join the streams and return a `Mono<Void>` that completes when either stream ends. |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
class ExampleHandler : WebSocketHandler { |
|
|
|
override fun handle(session: WebSocketSession): Mono<Void> { |
|
|
|
val input = session.receive() // <1> |
|
.doOnNext { |
|
// ... |
|
} |
|
.concatMap { |
|
// ... |
|
} |
|
.then() |
|
|
|
val source: Flux<String> = ... |
|
val output = session.send(source.map(session::textMessage)) // <2> |
|
|
|
return Mono.zip(input, output).then() // <3> |
|
} |
|
} |
|
---- |
|
<1> Handle inbound message stream. |
|
<2> Send outgoing messages. |
|
<3> Join the streams and return a `Mono<Void>` that completes when either stream ends. |
|
====== |
|
|
|
|
|
|
|
[[webflux-websocket-databuffer]] |
|
=== `DataBuffer` |
|
|
|
`DataBuffer` is the representation for a byte buffer in WebFlux. The Spring Core part of |
|
the reference has more on that in the section on |
|
xref:core/databuffer-codec.adoc[Data Buffers and Codecs]. The key point to understand is that on some |
|
servers like Netty, byte buffers are pooled and reference counted, and must be released |
|
when consumed to avoid memory leaks. |
|
|
|
When running on Netty, applications must use `DataBufferUtils.retain(dataBuffer)` if they |
|
wish to hold on input data buffers in order to ensure they are not released, and |
|
subsequently use `DataBufferUtils.release(dataBuffer)` when the buffers are consumed. |
|
|
|
|
|
|
|
|
|
[[webflux-websocket-server-handshake]] |
|
=== Handshake |
|
[.small]#xref:web/websocket/server.adoc#websocket-server-handshake[See equivalent in the Servlet stack]# |
|
|
|
`WebSocketHandlerAdapter` delegates to a `WebSocketService`. By default, that is an instance |
|
of `HandshakeWebSocketService`, which performs basic checks on the WebSocket request and |
|
then uses `RequestUpgradeStrategy` for the server in use. Currently, there is built-in |
|
support for Reactor Netty, Tomcat, Jetty, and Undertow. |
|
|
|
`HandshakeWebSocketService` exposes a `sessionAttributePredicate` property that allows |
|
setting a `Predicate<String>` to extract attributes from the `WebSession` and insert them |
|
into the attributes of the `WebSocketSession`. |
|
|
|
|
|
|
|
[[webflux-websocket-server-config]] |
|
=== Server Configuration |
|
[.small]#xref:web/websocket/server.adoc#websocket-server-runtime-configuration[See equivalent in the Servlet stack]# |
|
|
|
The `RequestUpgradeStrategy` for each server exposes configuration specific to the |
|
underlying WebSocket server engine. When using the WebFlux Java config you can customize |
|
such properties as shown in the corresponding section of the |
|
xref:web/webflux/config.adoc#webflux-config-websocket-service[WebFlux Config], or otherwise if |
|
not using the WebFlux config, use the below: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
@Configuration |
|
class WebConfig { |
|
|
|
@Bean |
|
public WebSocketHandlerAdapter handlerAdapter() { |
|
return new WebSocketHandlerAdapter(webSocketService()); |
|
} |
|
|
|
@Bean |
|
public WebSocketService webSocketService() { |
|
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy(); |
|
strategy.setMaxSessionIdleTimeout(0L); |
|
return new HandshakeWebSocketService(strategy); |
|
} |
|
} |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
@Configuration |
|
class WebConfig { |
|
|
|
@Bean |
|
fun handlerAdapter() = |
|
WebSocketHandlerAdapter(webSocketService()) |
|
|
|
@Bean |
|
fun webSocketService(): WebSocketService { |
|
val strategy = TomcatRequestUpgradeStrategy().apply { |
|
setMaxSessionIdleTimeout(0L) |
|
} |
|
return HandshakeWebSocketService(strategy) |
|
} |
|
} |
|
---- |
|
====== |
|
|
|
Check the upgrade strategy for your server to see what options are available. Currently, |
|
only Tomcat and Jetty expose such options. |
|
|
|
|
|
|
|
[[webflux-websocket-server-cors]] |
|
=== CORS |
|
[.small]#xref:web/websocket/server.adoc#websocket-server-allowed-origins[See equivalent in the Servlet stack]# |
|
|
|
The easiest way to configure CORS and restrict access to a WebSocket endpoint is to |
|
have your `WebSocketHandler` implement `CorsConfigurationSource` and return a |
|
`CorsConfiguration` with allowed origins, headers, and other details. If you cannot do |
|
that, you can also set the `corsConfigurations` property on the `SimpleUrlHandler` to |
|
specify CORS settings by URL pattern. If both are specified, they are combined by using the |
|
`combine` method on `CorsConfiguration`. |
|
|
|
|
|
|
|
[[webflux-websocket-client]] |
|
=== Client |
|
|
|
Spring WebFlux provides a `WebSocketClient` abstraction with implementations for |
|
Reactor Netty, Tomcat, Jetty, Undertow, and standard Java (that is, JSR-356). |
|
|
|
NOTE: The Tomcat client is effectively an extension of the standard Java one with some extra |
|
functionality in the `WebSocketSession` handling to take advantage of the Tomcat-specific |
|
API to suspend receiving messages for back pressure. |
|
|
|
To start a WebSocket session, you can create an instance of the client and use its `execute` |
|
methods: |
|
|
|
[tabs] |
|
====== |
|
Java:: |
|
+ |
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"] |
|
---- |
|
WebSocketClient client = new ReactorNettyWebSocketClient(); |
|
|
|
URI url = new URI("ws://localhost:8080/path"); |
|
client.execute(url, session -> |
|
session.receive() |
|
.doOnNext(System.out::println) |
|
.then()); |
|
---- |
|
|
|
Kotlin:: |
|
+ |
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"] |
|
---- |
|
val client = ReactorNettyWebSocketClient() |
|
|
|
val url = URI("ws://localhost:8080/path") |
|
client.execute(url) { session -> |
|
session.receive() |
|
.doOnNext(::println) |
|
.then() |
|
} |
|
---- |
|
====== |
|
|
|
Some clients, such as Jetty, implement `Lifecycle` and need to be stopped and started |
|
before you can use them. All clients have constructor options related to configuration |
|
of the underlying WebSocket client.
|
|
|