Browse Source

Polish RSocketRequester

pull/22870/head
Rossen Stoyanchev 6 years ago
parent
commit
8888a65079
  1. 28
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java
  2. 59
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

28
spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java

@ -54,19 +54,6 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { @@ -54,19 +54,6 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
return this;
}
@Override
public Mono<RSocketRequester> connect(ClientTransport transport, MimeType dataMimeType) {
return Mono.defer(() -> {
RSocketStrategies.Builder strategiesBuilder = RSocketStrategies.builder();
this.strategiesConfigurers.forEach(configurer -> configurer.accept(strategiesBuilder));
RSocketFactory.ClientRSocketFactory clientFactory = RSocketFactory.connect()
.dataMimeType(dataMimeType.toString());
this.factoryConfigurers.forEach(configurer -> configurer.accept(clientFactory));
return clientFactory.transport(transport).start()
.map(rsocket -> RSocketRequester.create(rsocket, dataMimeType, strategiesBuilder.build()));
});
}
@Override
public Mono<RSocketRequester> connectTcp(String host, int port, MimeType dataMimeType) {
return connect(TcpClientTransport.create(host, port), dataMimeType);
@ -77,4 +64,19 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { @@ -77,4 +64,19 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
return connect(WebsocketClientTransport.create(uri), dataMimeType);
}
@Override
public Mono<RSocketRequester> connect(ClientTransport transport, MimeType dataMimeType) {
return Mono.defer(() -> {
String mimeType = dataMimeType.toString();
RSocketFactory.ClientRSocketFactory factory = RSocketFactory.connect().dataMimeType(mimeType);
this.factoryConfigurers.forEach(configurer -> configurer.accept(factory));
RSocketStrategies.Builder builder = RSocketStrategies.builder();
this.strategiesConfigurers.forEach(configurer -> configurer.accept(builder));
return factory.transport(transport).start()
.map(rsocket -> RSocketRequester.create(rsocket, dataMimeType, builder.build()));
});
}
}

59
spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

@ -61,6 +61,16 @@ public interface RSocketRequester { @@ -61,6 +61,16 @@ public interface RSocketRequester {
RequestSpec route(String route);
/**
* Obtain a builder for an {@link RSocketRequester} by connecting to an
* RSocket server. The builder allows for customization of
* {@link RSocketFactory.ClientRSocketFactory ClientRSocketFactory} settings,
* {@link RSocketStrategies}, and for selecting the transport to use.
*/
static RSocketRequester.Builder builder() {
return new DefaultRSocketRequesterBuilder();
}
/**
* Create a new {@code RSocketRequester} from the given {@link RSocket} and
* strategies for encoding and decoding request and response payloads.
@ -73,26 +83,19 @@ public interface RSocketRequester { @@ -73,26 +83,19 @@ public interface RSocketRequester {
return new DefaultRSocketRequester(rsocket, dataMimeType, strategies);
}
/**
* Obtain a {@code RSocketRequester} builder.
*/
static RSocketRequester.Builder builder() {
return new DefaultRSocketRequesterBuilder();
}
/**
* A mutable builder for creating a client {@link RSocketRequester}.
* Builder to prepare an {@link RSocketRequester} by connecting to an
* RSocket server and wrapping the resulting {@link RSocket}.
*/
interface Builder {
/**
* Configure the client {@code RSocketFactory}. This is useful for
* customizing protocol options and add RSocket plugins.
* Configure the {@code ClientRSocketFactory} to customize protocol
* options, register RSocket plugins (interceptors), and more.
* @param configurer the configurer to apply
*/
RSocketRequester.Builder rsocketFactory(
Consumer<RSocketFactory.ClientRSocketFactory> configurer);
RSocketRequester.Builder rsocketFactory(Consumer<RSocketFactory.ClientRSocketFactory> configurer);
/**
* Configure the builder for {@link RSocketStrategies}.
@ -102,31 +105,29 @@ public interface RSocketRequester { @@ -102,31 +105,29 @@ public interface RSocketRequester {
RSocketRequester.Builder rsocketStrategies(Consumer<RSocketStrategies.Builder> configurer);
/**
* Configure the {@code ClientTransport} for the RSocket connection
* and connect to the RSocket server.
* @param transport the chosen client transport
* @return a mono containing the connected {@code RSocketRequester}
*/
Mono<RSocketRequester> connect(ClientTransport transport, MimeType dataMimeType);
/**
* Connect to the RSocket server over TCP transport using the
* provided connection parameters.
* @param host the RSocket server host
* @param port the RSocket server port
* @param dataMimeType the data MimeType
* @return a mono containing the connected {@code RSocketRequester}
* Connect to the RSocket server over TCP.
* @param host the server host
* @param port the server port
* @param dataMimeType the data MimeType for the connection
* @return an {@code RSocketRequester} for the connection
*/
Mono<RSocketRequester> connectTcp(String host, int port, MimeType dataMimeType);
/**
* Connect to the RSocket server over WebSocket transport using the
* provided connection parameters.
* Connect to the RSocket server over WebSocket.
* @param uri the RSocket server endpoint URI
* @param dataMimeType the data MimeType
* @return a mono containing the connected {@code RSocketRequester}
* @return an {@code RSocketRequester} for the connection
*/
Mono<RSocketRequester> connectWebSocket(URI uri, MimeType dataMimeType);
/**
* Connect to the RSocket server with the given {@code ClientTransport}.
* @param transport the client transport to use
* @return an {@code RSocketRequester} for the connection
*/
Mono<RSocketRequester> connect(ClientTransport transport, MimeType dataMimeType);
}

Loading…
Cancel
Save