You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
461 lines
13 KiB
461 lines
13 KiB
7 years ago
|
[[webflux-websocket]]
|
||
|
= WebSockets
|
||
2 years ago
|
[.small]#<<web.adoc#websocket, See equivalent in the Servlet stack>>#
|
||
7 years ago
|
|
||
7 years ago
|
This part of the reference documentation covers support for reactive-stack WebSocket
|
||
7 years ago
|
messaging.
|
||
|
|
||
|
|
||
|
|
||
|
|
||
6 years ago
|
|
||
7 years ago
|
[[webflux-websocket-server]]
|
||
|
== WebSocket API
|
||
2 years ago
|
[.small]#<<web.adoc#websocket-server, See equivalent in the Servlet stack>>#
|
||
7 years ago
|
|
||
7 years ago
|
The Spring Framework provides a WebSocket API that you can use to write client- and
|
||
|
server-side applications that handle WebSocket messages.
|
||
7 years ago
|
|
||
|
|
||
|
|
||
|
[[webflux-websocket-server-handler]]
|
||
7 years ago
|
=== Server
|
||
2 years ago
|
[.small]#<<web.adoc#websocket-server-handler, See equivalent in the Servlet stack>>#
|
||
7 years ago
|
|
||
7 years ago
|
To create a WebSocket server, you can first create a `WebSocketHandler`.
|
||
|
The following example shows how to do so:
|
||
7 years ago
|
|
||
6 years ago
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
||
|
.Java
|
||
7 years ago
|
----
|
||
7 years ago
|
import org.springframework.web.reactive.socket.WebSocketHandler;
|
||
|
import org.springframework.web.reactive.socket.WebSocketSession;
|
||
7 years ago
|
|
||
7 years ago
|
public class MyWebSocketHandler implements WebSocketHandler {
|
||
7 years ago
|
|
||
7 years ago
|
@Override
|
||
|
public Mono<Void> handle(WebSocketSession session) {
|
||
|
// ...
|
||
|
}
|
||
7 years ago
|
}
|
||
|
----
|
||
6 years ago
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||
|
.Kotlin
|
||
|
----
|
||
|
import org.springframework.web.reactive.socket.WebSocketHandler
|
||
|
import org.springframework.web.reactive.socket.WebSocketSession
|
||
|
|
||
|
class MyWebSocketHandler : WebSocketHandler {
|
||
|
|
||
|
override fun handle(session: WebSocketSession): Mono<Void> {
|
||
|
// ...
|
||
|
}
|
||
|
}
|
||
|
----
|
||
7 years ago
|
|
||
5 years ago
|
Then you can map it to a URL:
|
||
7 years ago
|
|
||
6 years ago
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
||
|
.Java
|
||
7 years ago
|
----
|
||
7 years ago
|
@Configuration
|
||
6 years ago
|
class WebConfig {
|
||
7 years ago
|
|
||
|
@Bean
|
||
|
public HandlerMapping handlerMapping() {
|
||
|
Map<String, WebSocketHandler> map = new HashMap<>();
|
||
|
map.put("/path", new MyWebSocketHandler());
|
||
6 years ago
|
int order = -1; // before annotated controllers
|
||
7 years ago
|
|
||
6 years ago
|
return new SimpleUrlHandlerMapping(map, order);
|
||
7 years ago
|
}
|
||
7 years ago
|
}
|
||
|
----
|
||
6 years ago
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||
|
.Kotlin
|
||
|
----
|
||
|
@Configuration
|
||
|
class WebConfig {
|
||
|
|
||
|
@Bean
|
||
|
fun handlerMapping(): HandlerMapping {
|
||
|
val map = mapOf("/path" to MyWebSocketHandler())
|
||
|
val order = -1 // before annotated controllers
|
||
|
|
||
|
return SimpleUrlHandlerMapping(map, order)
|
||
|
}
|
||
5 years ago
|
}
|
||
|
----
|
||
|
|
||
|
If using the <<web-reactive.adoc#webflux-config, WebFlux Config>> there is nothing
|
||
|
further to do, or otherwise if not using the WebFlux config you'll need to declare a
|
||
|
`WebSocketHandlerAdapter` as shown below:
|
||
|
|
||
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
||
|
.Java
|
||
|
----
|
||
|
@Configuration
|
||
|
class WebConfig {
|
||
|
|
||
|
// ...
|
||
|
|
||
|
@Bean
|
||
|
public WebSocketHandlerAdapter handlerAdapter() {
|
||
|
return new WebSocketHandlerAdapter();
|
||
|
}
|
||
|
}
|
||
|
----
|
||
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||
|
.Kotlin
|
||
|
----
|
||
|
@Configuration
|
||
|
class WebConfig {
|
||
|
|
||
|
// ...
|
||
6 years ago
|
|
||
|
@Bean
|
||
|
fun handlerAdapter() = WebSocketHandlerAdapter()
|
||
|
}
|
||
|
----
|
||
7 years ago
|
|
||
|
|
||
|
|
||
7 years ago
|
[[webflux-websockethandler]]
|
||
6 years ago
|
=== `WebSocketHandler`
|
||
7 years ago
|
|
||
7 years ago
|
The `handle` method of `WebSocketHandler` takes `WebSocketSession` and returns `Mono<Void>`
|
||
|
to indicate when application handling of the session is complete. The session is handled
|
||
7 years ago
|
through two streams, one for inbound and one for outbound messages. The following table
|
||
|
describes the two methods that handle the streams:
|
||
7 years ago
|
|
||
|
[options="header"]
|
||
|
|===
|
||
7 years ago
|
| `WebSocketSession` method | Description
|
||
7 years ago
|
|
||
|
| `Flux<WebSocketMessage> receive()`
|
||
7 years ago
|
| Provides access to the inbound message stream and completes when the connection is closed.
|
||
7 years ago
|
|
||
|
| `Mono<Void> send(Publisher<WebSocketMessage>)`
|
||
|
| Takes a source for outgoing messages, writes the messages, and returns a `Mono<Void>` that
|
||
|
completes when the source completes and writing is done.
|
||
|
|
||
|
|===
|
||
|
|
||
7 years ago
|
A `WebSocketHandler` must compose the inbound and outbound streams into a unified flow and
|
||
7 years ago
|
return a `Mono<Void>` that reflects the completion of that flow. Depending on application
|
||
|
requirements, the unified flow completes when:
|
||
|
|
||
7 years ago
|
* Either the inbound or the outbound message stream completes.
|
||
|
* The inbound stream completes (that is, the connection closed), while the outbound stream is infinite.
|
||
|
* At a chosen point, through the `close` method of `WebSocketSession`.
|
||
7 years ago
|
|
||
|
When inbound and outbound message streams are composed together, there is no need to
|
||
5 years ago
|
check if the connection is open, since Reactive Streams signals end activity.
|
||
7 years ago
|
The inbound stream receives a completion or error signal, and the outbound stream
|
||
7 years ago
|
receives a cancellation signal.
|
||
|
|
||
7 years ago
|
The most basic implementation of a handler is one that handles the inbound stream. The
|
||
|
following example shows such an implementation:
|
||
7 years ago
|
|
||
6 years ago
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
||
|
.Java
|
||
7 years ago
|
----
|
||
6 years ago
|
class ExampleHandler implements WebSocketHandler {
|
||
|
|
||
|
@Override
|
||
|
public Mono<Void> handle(WebSocketSession session) {
|
||
|
return session.receive() // <1>
|
||
|
.doOnNext(message -> {
|
||
|
// ... // <2>
|
||
|
})
|
||
|
.concatMap(message -> {
|
||
|
// ... // <3>
|
||
|
})
|
||
|
.then(); // <4>
|
||
|
}
|
||
|
}
|
||
|
----
|
||
|
<1> Access the stream of inbound messages.
|
||
|
<2> Do something with each message.
|
||
|
<3> Perform nested asynchronous operations that use the message content.
|
||
|
<4> Return a `Mono<Void>` that completes when receiving completes.
|
||
|
|
||
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||
|
.Kotlin
|
||
|
----
|
||
|
class ExampleHandler : WebSocketHandler {
|
||
|
|
||
|
override fun handle(session: WebSocketSession): Mono<Void> {
|
||
|
return session.receive() // <1>
|
||
|
.doOnNext {
|
||
|
// ... // <2>
|
||
|
}
|
||
|
.concatMap {
|
||
|
// ... // <3>
|
||
|
}
|
||
|
.then() // <4>
|
||
|
}
|
||
7 years ago
|
}
|
||
|
----
|
||
7 years ago
|
<1> Access the stream of inbound messages.
|
||
7 years ago
|
<2> Do something with each message.
|
||
7 years ago
|
<3> Perform nested asynchronous operations that use the message content.
|
||
|
<4> Return a `Mono<Void>` that completes when receiving completes.
|
||
6 years ago
|
|
||
7 years ago
|
|
||
|
TIP: For nested, asynchronous operations, you may need to call `message.retain()` on underlying
|
||
|
servers that use pooled data buffers (for example, Netty). Otherwise, the data buffer may be
|
||
|
released before you have had a chance to read the data. For more background, see
|
||
6 years ago
|
<<core.adoc#databuffers, Data Buffers and Codecs>>.
|
||
7 years ago
|
|
||
7 years ago
|
The following implementation combines the inbound and outbound streams:
|
||
7 years ago
|
|
||
6 years ago
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
||
|
.Java
|
||
|
----
|
||
|
class ExampleHandler implements WebSocketHandler {
|
||
|
|
||
|
@Override
|
||
|
public Mono<Void> handle(WebSocketSession session) {
|
||
|
|
||
|
Flux<WebSocketMessage> output = session.receive() // <1>
|
||
|
.doOnNext(message -> {
|
||
|
// ...
|
||
|
})
|
||
|
.concatMap(message -> {
|
||
|
// ...
|
||
|
})
|
||
|
.map(value -> session.textMessage("Echo " + value)); // <2>
|
||
|
|
||
|
return session.send(output); // <3>
|
||
|
}
|
||
|
}
|
||
7 years ago
|
----
|
||
6 years ago
|
<1> Handle the inbound message stream.
|
||
|
<2> Create the outbound message, producing a combined flow.
|
||
|
<3> Return a `Mono<Void>` that does not complete while we continue to receive.
|
||
|
|
||
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||
|
.Kotlin
|
||
|
----
|
||
|
class ExampleHandler : WebSocketHandler {
|
||
7 years ago
|
|
||
6 years ago
|
override fun handle(session: WebSocketSession): Mono<Void> {
|
||
7 years ago
|
|
||
6 years ago
|
val output = session.receive() // <1>
|
||
|
.doOnNext {
|
||
|
// ...
|
||
|
}
|
||
|
.concatMap {
|
||
|
// ...
|
||
|
}
|
||
|
.map { session.textMessage("Echo $it") } // <2>
|
||
7 years ago
|
|
||
6 years ago
|
return session.send(output) // <3>
|
||
|
}
|
||
7 years ago
|
}
|
||
|
----
|
||
7 years ago
|
<1> Handle the inbound message stream.
|
||
|
<2> Create the outbound message, producing a combined flow.
|
||
|
<3> Return a `Mono<Void>` that does not complete while we continue to receive.
|
||
6 years ago
|
|
||
7 years ago
|
|
||
7 years ago
|
Inbound and outbound streams can be independent and be joined only for completion,
|
||
|
as the following example shows:
|
||
7 years ago
|
|
||
6 years ago
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
||
|
.Java
|
||
7 years ago
|
----
|
||
6 years ago
|
class ExampleHandler implements WebSocketHandler {
|
||
7 years ago
|
|
||
6 years ago
|
@Override
|
||
|
public Mono<Void> handle(WebSocketSession session) {
|
||
7 years ago
|
|
||
6 years ago
|
Mono<Void> input = session.receive() <1>
|
||
|
.doOnNext(message -> {
|
||
|
// ...
|
||
|
})
|
||
|
.concatMap(message -> {
|
||
|
// ...
|
||
|
})
|
||
|
.then();
|
||
7 years ago
|
|
||
6 years ago
|
Flux<String> source = ... ;
|
||
|
Mono<Void> output = session.send(source.map(session::textMessage)); <2>
|
||
7 years ago
|
|
||
6 years ago
|
return Mono.zip(input, output).then(); <3>
|
||
|
}
|
||
|
}
|
||
|
----
|
||
|
<1> Handle inbound message stream.
|
||
|
<2> Send outgoing messages.
|
||
|
<3> Join the streams and return a `Mono<Void>` that completes when either stream ends.
|
||
|
|
||
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||
|
.Kotlin
|
||
|
----
|
||
|
class ExampleHandler : WebSocketHandler {
|
||
|
|
||
|
override fun handle(session: WebSocketSession): Mono<Void> {
|
||
|
|
||
|
val input = session.receive() // <1>
|
||
|
.doOnNext {
|
||
|
// ...
|
||
|
}
|
||
|
.concatMap {
|
||
|
// ...
|
||
|
}
|
||
|
.then()
|
||
|
|
||
|
val source: Flux<String> = ...
|
||
|
val output = session.send(source.map(session::textMessage)) // <2>
|
||
|
|
||
|
return Mono.zip(input, output).then() // <3>
|
||
|
}
|
||
7 years ago
|
}
|
||
|
----
|
||
|
<1> Handle inbound message stream.
|
||
7 years ago
|
<2> Send outgoing messages.
|
||
7 years ago
|
<3> Join the streams and return a `Mono<Void>` that completes when either stream ends.
|
||
7 years ago
|
|
||
|
|
||
|
|
||
6 years ago
|
[[webflux-websocket-databuffer]]
|
||
|
=== `DataBuffer`
|
||
|
|
||
|
`DataBuffer` is the representation for a byte buffer in WebFlux. The Spring Core part of
|
||
|
the reference has more on that in the section on
|
||
6 years ago
|
<<core#databuffers, Data Buffers and Codecs>>. The key point to understand is that on some
|
||
6 years ago
|
servers like Netty, byte buffers are pooled and reference counted, and must be released
|
||
|
when consumed to avoid memory leaks.
|
||
|
|
||
|
When running on Netty, applications must use `DataBufferUtils.retain(dataBuffer)` if they
|
||
|
wish to hold on input data buffers in order to ensure they are not released, and
|
||
|
subsequently use `DataBufferUtils.release(dataBuffer)` when the buffers are consumed.
|
||
|
|
||
|
|
||
|
|
||
|
|
||
7 years ago
|
[[webflux-websocket-server-handshake]]
|
||
7 years ago
|
=== Handshake
|
||
2 years ago
|
[.small]#<<web.adoc#websocket-server-handshake, See equivalent in the Servlet stack>>#
|
||
7 years ago
|
|
||
7 years ago
|
`WebSocketHandlerAdapter` delegates to a `WebSocketService`. By default, that is an instance
|
||
7 years ago
|
of `HandshakeWebSocketService`, which performs basic checks on the WebSocket request and
|
||
7 years ago
|
then uses `RequestUpgradeStrategy` for the server in use. Currently, there is built-in
|
||
7 years ago
|
support for Reactor Netty, Tomcat, Jetty, and Undertow.
|
||
7 years ago
|
|
||
7 years ago
|
`HandshakeWebSocketService` exposes a `sessionAttributePredicate` property that allows
|
||
|
setting a `Predicate<String>` to extract attributes from the `WebSession` and insert them
|
||
|
into the attributes of the `WebSocketSession`.
|
||
|
|
||
7 years ago
|
|
||
7 years ago
|
|
||
|
[[webflux-websocket-server-config]]
|
||
3 years ago
|
=== Server Configuration
|
||
2 years ago
|
[.small]#<<web.adoc#websocket-server-runtime-configuration, See equivalent in the Servlet stack>>#
|
||
7 years ago
|
|
||
5 years ago
|
The `RequestUpgradeStrategy` for each server exposes configuration specific to the
|
||
|
underlying WebSocket server engine. When using the WebFlux Java config you can customize
|
||
|
such properties as shown in the corresponding section of the
|
||
|
<<web-reactive.adoc#webflux-config-websocket-service, WebFlux Config>>, or otherwise if
|
||
|
not using the WebFlux config, use the below:
|
||
7 years ago
|
|
||
6 years ago
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
||
|
.Java
|
||
7 years ago
|
----
|
||
7 years ago
|
@Configuration
|
||
6 years ago
|
class WebConfig {
|
||
7 years ago
|
|
||
|
@Bean
|
||
|
public WebSocketHandlerAdapter handlerAdapter() {
|
||
|
return new WebSocketHandlerAdapter(webSocketService());
|
||
|
}
|
||
|
|
||
|
@Bean
|
||
|
public WebSocketService webSocketService() {
|
||
|
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
|
||
|
strategy.setMaxSessionIdleTimeout(0L);
|
||
|
return new HandshakeWebSocketService(strategy);
|
||
|
}
|
||
|
}
|
||
7 years ago
|
----
|
||
6 years ago
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||
|
.Kotlin
|
||
|
----
|
||
|
@Configuration
|
||
|
class WebConfig {
|
||
|
|
||
|
@Bean
|
||
|
fun handlerAdapter() =
|
||
|
WebSocketHandlerAdapter(webSocketService())
|
||
|
|
||
|
@Bean
|
||
|
fun webSocketService(): WebSocketService {
|
||
|
val strategy = TomcatRequestUpgradeStrategy().apply {
|
||
5 years ago
|
setMaxSessionIdleTimeout(0L)
|
||
6 years ago
|
}
|
||
|
return HandshakeWebSocketService(strategy)
|
||
|
}
|
||
|
}
|
||
|
----
|
||
7 years ago
|
|
||
7 years ago
|
Check the upgrade strategy for your server to see what options are available. Currently,
|
||
7 years ago
|
only Tomcat and Jetty expose such options.
|
||
|
|
||
|
|
||
|
|
||
|
[[webflux-websocket-server-cors]]
|
||
|
=== CORS
|
||
2 years ago
|
[.small]#<<web.adoc#websocket-server-allowed-origins, See equivalent in the Servlet stack>>#
|
||
7 years ago
|
|
||
|
The easiest way to configure CORS and restrict access to a WebSocket endpoint is to
|
||
|
have your `WebSocketHandler` implement `CorsConfigurationSource` and return a
|
||
4 years ago
|
`CorsConfiguration` with allowed origins, headers, and other details. If you cannot do
|
||
7 years ago
|
that, you can also set the `corsConfigurations` property on the `SimpleUrlHandler` to
|
||
7 years ago
|
specify CORS settings by URL pattern. If both are specified, they are combined by using the
|
||
7 years ago
|
`combine` method on `CorsConfiguration`.
|
||
|
|
||
|
|
||
|
|
||
|
[[webflux-websocket-client]]
|
||
7 years ago
|
=== Client
|
||
7 years ago
|
|
||
|
Spring WebFlux provides a `WebSocketClient` abstraction with implementations for
|
||
7 years ago
|
Reactor Netty, Tomcat, Jetty, Undertow, and standard Java (that is, JSR-356).
|
||
7 years ago
|
|
||
7 years ago
|
NOTE: The Tomcat client is effectively an extension of the standard Java one with some extra
|
||
|
functionality in the `WebSocketSession` handling to take advantage of the Tomcat-specific
|
||
7 years ago
|
API to suspend receiving messages for back pressure.
|
||
|
|
||
7 years ago
|
To start a WebSocket session, you can create an instance of the client and use its `execute`
|
||
7 years ago
|
methods:
|
||
|
|
||
6 years ago
|
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
|
||
|
.Java
|
||
7 years ago
|
----
|
||
6 years ago
|
WebSocketClient client = new ReactorNettyWebSocketClient();
|
||
7 years ago
|
|
||
6 years ago
|
URI url = new URI("ws://localhost:8080/path");
|
||
|
client.execute(url, session ->
|
||
|
session.receive()
|
||
|
.doOnNext(System.out::println)
|
||
|
.then());
|
||
|
----
|
||
|
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
|
||
|
.Kotlin
|
||
|
----
|
||
|
val client = ReactorNettyWebSocketClient()
|
||
|
|
||
|
val url = URI("ws://localhost:8080/path")
|
||
|
client.execute(url) { session ->
|
||
|
session.receive()
|
||
|
.doOnNext(::println)
|
||
|
.then()
|
||
|
}
|
||
7 years ago
|
----
|
||
|
|
||
7 years ago
|
Some clients, such as Jetty, implement `Lifecycle` and need to be stopped and started
|
||
7 years ago
|
before you can use them. All clients have constructor options related to configuration
|
||
|
of the underlying WebSocket client.
|