Browse Source

Support for setupPayload in RSocketRequester

Closes gh-23368
pull/23382/head
Rossen Stoyanchev 5 years ago
parent
commit
2c878e9331
  1. 83
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java
  2. 41
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java
  3. 3
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java
  4. 18
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java
  5. 10
      spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java

83
spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java

@ -19,9 +19,12 @@ package org.springframework.messaging.rsocket; @@ -19,9 +19,12 @@ package org.springframework.messaging.rsocket;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import io.rsocket.Payload;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
@ -29,11 +32,16 @@ import io.rsocket.transport.netty.client.TcpClientTransport; @@ -29,11 +32,16 @@ import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import reactor.core.publisher.Mono;
import org.springframework.core.ResolvableType;
import org.springframework.core.codec.Decoder;
import org.springframework.core.codec.Encoder;
import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MimeType;
/**
@ -45,11 +53,26 @@ import org.springframework.util.MimeType; @@ -45,11 +53,26 @@ import org.springframework.util.MimeType;
*/
final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
private static final Map<String, Object> HINTS = Collections.emptyMap();
@Nullable
private MimeType dataMimeType;
private MimeType metadataMimeType = MetadataExtractor.COMPOSITE_METADATA;
@Nullable
private Object setupData;
@Nullable
private String setupRoute;
@Nullable
private Object[] setupRouteVars;
@Nullable
private Map<Object, MimeType> setupMetadata;
@Nullable
private RSocketStrategies strategies;
@ -71,6 +94,26 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { @@ -71,6 +94,26 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
return this;
}
@Override
public RSocketRequester.Builder setupData(Object data) {
this.setupData = data;
return this;
}
@Override
public RSocketRequester.Builder setupRoute(String route, Object... routeVars) {
this.setupRoute = route;
this.setupRouteVars = routeVars;
return this;
}
@Override
public RSocketRequester.Builder setupMetadata(Object metadata, @Nullable MimeType mimeType) {
this.setupMetadata = (this.setupMetadata == null ? new LinkedHashMap<>(4) : this.setupMetadata);
this.setupMetadata.put(metadata, mimeType);
return this;
}
@Override
public RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies) {
this.strategies = strategies;
@ -120,12 +163,52 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { @@ -120,12 +163,52 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder {
factory.dataMimeType(dataMimeType.toString());
factory.metadataMimeType(this.metadataMimeType.toString());
Payload setupPayload = getSetupPayload(dataMimeType, rsocketStrategies);
if (setupPayload != null) {
factory.setupPayload(setupPayload);
}
return factory.transport(transport)
.start()
.map(rsocket -> new DefaultRSocketRequester(
rsocket, dataMimeType, this.metadataMimeType, rsocketStrategies));
}
@Nullable
private Payload getSetupPayload(MimeType dataMimeType, RSocketStrategies strategies) {
DataBuffer metadata = null;
if (this.setupRoute != null || !CollectionUtils.isEmpty(this.setupMetadata)) {
metadata = new MetadataEncoder(this.metadataMimeType, strategies)
.metadataAndOrRoute(this.setupMetadata, this.setupRoute, this.setupRouteVars)
.encode();
}
DataBuffer data = null;
if (this.setupData != null) {
try {
ResolvableType type = ResolvableType.forClass(this.setupData.getClass());
Encoder<Object> encoder = strategies.encoder(type, dataMimeType);
Assert.notNull(encoder, () -> "No encoder for " + dataMimeType + ", " + type);
data = encoder.encodeValue(this.setupData, strategies.dataBufferFactory(), type, dataMimeType, HINTS);
}
catch (Throwable ex) {
if (metadata != null) {
DataBufferUtils.release(metadata);
}
throw ex;
}
}
if (metadata == null && data == null) {
return null;
}
metadata = metadata != null ? metadata : emptyBuffer(strategies);
data = data != null ? data : emptyBuffer(strategies);
return PayloadUtils.createPayload(metadata, data);
}
private DataBuffer emptyBuffer(RSocketStrategies strategies) {
return strategies.dataBufferFactory().wrap(new byte[0]);
}
private RSocketStrategies getRSocketStrategies() {
if (!this.strategiesConfigurers.isEmpty()) {
RSocketStrategies.Builder builder =

41
spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java

@ -20,6 +20,7 @@ import java.net.URI; @@ -20,6 +20,7 @@ import java.net.URI;
import java.util.function.Consumer;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
@ -139,6 +140,28 @@ public interface RSocketRequester { @@ -139,6 +140,28 @@ public interface RSocketRequester {
*/
RSocketRequester.Builder metadataMimeType(MimeType mimeType);
/**
* Set the data for the setup payload. The data will be encoded
* according to the configured {@link #dataMimeType(MimeType)}.
* <p>By default this is not set.
*/
RSocketRequester.Builder setupData(Object data);
/**
* Set the route for the setup payload. The rules for formatting and
* encoding the route are the same as those for a request route as
* described in {@link #route(String, Object...)}.
* <p>By default this is not set.
*/
RSocketRequester.Builder setupRoute(String route, Object... routeVars);
/**
* Add metadata entry to the setup payload. Composite metadata must be
* in use if this is called more than once or in addition to
* {@link #setupRoute(String, Object...)}.
*/
RSocketRequester.Builder setupMetadata(Object value, @Nullable MimeType mimeType);
/**
* Provide {@link RSocketStrategies} to use.
* <p>By default this is based on default settings of
@ -157,12 +180,20 @@ public interface RSocketRequester { @@ -157,12 +180,20 @@ public interface RSocketRequester {
/**
* Callback to configure the {@code ClientRSocketFactory} directly.
* <p>Note that the data and metadata mime types cannot be set directly
* on the {@code ClientRSocketFactory}. Use shortcuts on this builder
* {@link #dataMimeType(MimeType)} and {@link #metadataMimeType(MimeType)}
* instead.
* <p>To configure client side responding, see
* <ul>
* <li>The data and metadata mime types cannot be set directly
* on the {@code ClientRSocketFactory} and will be overridden. Use the
* shortcuts {@link #dataMimeType(MimeType)} and
* {@link #metadataMimeType(MimeType)} on this builder instead.
* <li>The frame decoder also cannot be set directly and instead is set
* to match the configured {@code DataBufferFactory}.
* <li>For the
* {@link io.rsocket.RSocketFactory.ClientRSocketFactory#setupPayload(Payload)
* setupPayload}, consider using methods on this builder to specify the
* route, other metadata, and data as Object values to be encoded.
* <li>To configure client side responding, see
* {@link RSocketMessageHandler#clientResponder(RSocketStrategies, Object...)}.
* </ul>
*/
RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer);

3
spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java

@ -188,7 +188,8 @@ public interface RSocketStrategies { @@ -188,7 +188,8 @@ public interface RSocketStrategies {
* Configure a {@link MetadataExtractor} to extract the route along with
* other metadata. This option is applicable to client or server
* responders.
* <p>By default this is {@link DefaultMetadataExtractor} extracting a
* <p>By default this is {@link DefaultMetadataExtractor} created with
* the {@link #decoder(Decoder[]) configured} decoders and extracting a
* route from {@code "message/x.rsocket.routing.v0"} metadata.
*/
Builder metadataExtractor(@Nullable MetadataExtractor metadataExtractor);

18
spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java

@ -172,6 +172,24 @@ public class DefaultRSocketRequesterBuilderTests { @@ -172,6 +172,24 @@ public class DefaultRSocketRequesterBuilderTests {
assertThat(requester.metadataMimeType()).isEqualTo(metaMimeType);
}
@Test
public void setupRoute() {
RSocketRequester.builder()
.dataMimeType(MimeTypeUtils.TEXT_PLAIN)
.metadataMimeType(MimeTypeUtils.TEXT_PLAIN)
.setupRoute("toA")
.setupData("My data")
.connect(this.transport)
.block();
ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames())
.map(ConnectionSetupPayload::create)
.block();
assertThat(setupPayload.getMetadataUtf8()).isEqualTo("toA");
assertThat(setupPayload.getDataUtf8()).isEqualTo("My data");
}
@Test
public void frameDecoderMatchesDataBufferFactory() throws Exception {
testFrameDecoder(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT), PayloadDecoder.ZERO_COPY);

10
spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java

@ -23,7 +23,6 @@ import io.rsocket.SocketAcceptor; @@ -23,7 +23,6 @@ import io.rsocket.SocketAcceptor;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.ByteBufPayload;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -37,12 +36,10 @@ import reactor.test.StepVerifier; @@ -37,12 +36,10 @@ import reactor.test.StepVerifier;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.codec.StringDecoder;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.stereotype.Controller;
import org.springframework.util.MimeTypeUtils;
/**
* Client-side handling of requests initiated from the server side.
@ -112,10 +109,9 @@ public class RSocketServerToClientIntegrationTests { @@ -112,10 +109,9 @@ public class RSocketServerToClientIntegrationTests {
RSocketRequester requester = null;
try {
requester = RSocketRequester.builder()
.metadataMimeType(MimeTypeUtils.TEXT_PLAIN)
.setupRoute(connectionRoute)
.rsocketStrategies(strategies)
.rsocketFactory(clientResponderConfigurer)
.rsocketFactory(factory -> factory.setupPayload(ByteBufPayload.create("", connectionRoute)))
.connectTcp("localhost", server.address().getPort())
.block();
@ -266,9 +262,7 @@ public class RSocketServerToClientIntegrationTests { @@ -266,9 +262,7 @@ public class RSocketServerToClientIntegrationTests {
@Bean
public RSocketStrategies rsocketStrategies() {
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(StringDecoder.allMimeTypes());
extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY);
return RSocketStrategies.builder().metadataExtractor(extractor).build();
return RSocketStrategies.create();
}
}

Loading…
Cancel
Save