From 2c878e9331289ac8e5768e60881e9ac873ba7865 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Mon, 29 Jul 2019 22:04:08 +0100 Subject: [PATCH] Support for setupPayload in RSocketRequester Closes gh-23368 --- .../DefaultRSocketRequesterBuilder.java | 83 +++++++++++++++++++ .../messaging/rsocket/RSocketRequester.java | 41 +++++++-- .../messaging/rsocket/RSocketStrategies.java | 3 +- .../DefaultRSocketRequesterBuilderTests.java | 18 ++++ ...RSocketServerToClientIntegrationTests.java | 10 +-- 5 files changed, 141 insertions(+), 14 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java index 10c05cf0ef..c73986b54d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java @@ -19,9 +19,12 @@ package org.springframework.messaging.rsocket; import java.net.URI; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.function.Consumer; +import io.rsocket.Payload; import io.rsocket.RSocketFactory; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.ClientTransport; @@ -29,11 +32,16 @@ import io.rsocket.transport.netty.client.TcpClientTransport; import io.rsocket.transport.netty.client.WebsocketClientTransport; import reactor.core.publisher.Mono; +import org.springframework.core.ResolvableType; import org.springframework.core.codec.Decoder; +import org.springframework.core.codec.Encoder; import org.springframework.core.codec.StringDecoder; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; import org.springframework.util.MimeType; /** @@ -45,11 +53,26 @@ import org.springframework.util.MimeType; */ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { + private static final Map HINTS = Collections.emptyMap(); + + @Nullable private MimeType dataMimeType; private MimeType metadataMimeType = MetadataExtractor.COMPOSITE_METADATA; + @Nullable + private Object setupData; + + @Nullable + private String setupRoute; + + @Nullable + private Object[] setupRouteVars; + + @Nullable + private Map setupMetadata; + @Nullable private RSocketStrategies strategies; @@ -71,6 +94,26 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { return this; } + @Override + public RSocketRequester.Builder setupData(Object data) { + this.setupData = data; + return this; + } + + @Override + public RSocketRequester.Builder setupRoute(String route, Object... routeVars) { + this.setupRoute = route; + this.setupRouteVars = routeVars; + return this; + } + + @Override + public RSocketRequester.Builder setupMetadata(Object metadata, @Nullable MimeType mimeType) { + this.setupMetadata = (this.setupMetadata == null ? new LinkedHashMap<>(4) : this.setupMetadata); + this.setupMetadata.put(metadata, mimeType); + return this; + } + @Override public RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies) { this.strategies = strategies; @@ -120,12 +163,52 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { factory.dataMimeType(dataMimeType.toString()); factory.metadataMimeType(this.metadataMimeType.toString()); + Payload setupPayload = getSetupPayload(dataMimeType, rsocketStrategies); + if (setupPayload != null) { + factory.setupPayload(setupPayload); + } + return factory.transport(transport) .start() .map(rsocket -> new DefaultRSocketRequester( rsocket, dataMimeType, this.metadataMimeType, rsocketStrategies)); } + @Nullable + private Payload getSetupPayload(MimeType dataMimeType, RSocketStrategies strategies) { + DataBuffer metadata = null; + if (this.setupRoute != null || !CollectionUtils.isEmpty(this.setupMetadata)) { + metadata = new MetadataEncoder(this.metadataMimeType, strategies) + .metadataAndOrRoute(this.setupMetadata, this.setupRoute, this.setupRouteVars) + .encode(); + } + DataBuffer data = null; + if (this.setupData != null) { + try { + ResolvableType type = ResolvableType.forClass(this.setupData.getClass()); + Encoder encoder = strategies.encoder(type, dataMimeType); + Assert.notNull(encoder, () -> "No encoder for " + dataMimeType + ", " + type); + data = encoder.encodeValue(this.setupData, strategies.dataBufferFactory(), type, dataMimeType, HINTS); + } + catch (Throwable ex) { + if (metadata != null) { + DataBufferUtils.release(metadata); + } + throw ex; + } + } + if (metadata == null && data == null) { + return null; + } + metadata = metadata != null ? metadata : emptyBuffer(strategies); + data = data != null ? data : emptyBuffer(strategies); + return PayloadUtils.createPayload(metadata, data); + } + + private DataBuffer emptyBuffer(RSocketStrategies strategies) { + return strategies.dataBufferFactory().wrap(new byte[0]); + } + private RSocketStrategies getRSocketStrategies() { if (!this.strategiesConfigurers.isEmpty()) { RSocketStrategies.Builder builder = diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index 5a8d123494..c1235aac36 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -20,6 +20,7 @@ import java.net.URI; import java.util.function.Consumer; import io.rsocket.ConnectionSetupPayload; +import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.transport.ClientTransport; import io.rsocket.transport.netty.client.TcpClientTransport; @@ -139,6 +140,28 @@ public interface RSocketRequester { */ RSocketRequester.Builder metadataMimeType(MimeType mimeType); + /** + * Set the data for the setup payload. The data will be encoded + * according to the configured {@link #dataMimeType(MimeType)}. + *

By default this is not set. + */ + RSocketRequester.Builder setupData(Object data); + + /** + * Set the route for the setup payload. The rules for formatting and + * encoding the route are the same as those for a request route as + * described in {@link #route(String, Object...)}. + *

By default this is not set. + */ + RSocketRequester.Builder setupRoute(String route, Object... routeVars); + + /** + * Add metadata entry to the setup payload. Composite metadata must be + * in use if this is called more than once or in addition to + * {@link #setupRoute(String, Object...)}. + */ + RSocketRequester.Builder setupMetadata(Object value, @Nullable MimeType mimeType); + /** * Provide {@link RSocketStrategies} to use. *

By default this is based on default settings of @@ -157,12 +180,20 @@ public interface RSocketRequester { /** * Callback to configure the {@code ClientRSocketFactory} directly. - *

Note that the data and metadata mime types cannot be set directly - * on the {@code ClientRSocketFactory}. Use shortcuts on this builder - * {@link #dataMimeType(MimeType)} and {@link #metadataMimeType(MimeType)} - * instead. - *

To configure client side responding, see + *

    + *
  • The data and metadata mime types cannot be set directly + * on the {@code ClientRSocketFactory} and will be overridden. Use the + * shortcuts {@link #dataMimeType(MimeType)} and + * {@link #metadataMimeType(MimeType)} on this builder instead. + *
  • The frame decoder also cannot be set directly and instead is set + * to match the configured {@code DataBufferFactory}. + *
  • For the + * {@link io.rsocket.RSocketFactory.ClientRSocketFactory#setupPayload(Payload) + * setupPayload}, consider using methods on this builder to specify the + * route, other metadata, and data as Object values to be encoded. + *
  • To configure client side responding, see * {@link RSocketMessageHandler#clientResponder(RSocketStrategies, Object...)}. + *
*/ RSocketRequester.Builder rsocketFactory(ClientRSocketFactoryConfigurer configurer); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java index 0449cf4c80..f782f022a4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java @@ -188,7 +188,8 @@ public interface RSocketStrategies { * Configure a {@link MetadataExtractor} to extract the route along with * other metadata. This option is applicable to client or server * responders. - *

By default this is {@link DefaultMetadataExtractor} extracting a + *

By default this is {@link DefaultMetadataExtractor} created with + * the {@link #decoder(Decoder[]) configured} decoders and extracting a * route from {@code "message/x.rsocket.routing.v0"} metadata. */ Builder metadataExtractor(@Nullable MetadataExtractor metadataExtractor); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java index 106871ec64..96f774fa40 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilderTests.java @@ -172,6 +172,24 @@ public class DefaultRSocketRequesterBuilderTests { assertThat(requester.metadataMimeType()).isEqualTo(metaMimeType); } + @Test + public void setupRoute() { + RSocketRequester.builder() + .dataMimeType(MimeTypeUtils.TEXT_PLAIN) + .metadataMimeType(MimeTypeUtils.TEXT_PLAIN) + .setupRoute("toA") + .setupData("My data") + .connect(this.transport) + .block(); + + ConnectionSetupPayload setupPayload = Mono.from(this.connection.sentFrames()) + .map(ConnectionSetupPayload::create) + .block(); + + assertThat(setupPayload.getMetadataUtf8()).isEqualTo("toA"); + assertThat(setupPayload.getDataUtf8()).isEqualTo("My data"); + } + @Test public void frameDecoderMatchesDataBufferFactory() throws Exception { testFrameDecoder(new NettyDataBufferFactory(ByteBufAllocator.DEFAULT), PayloadDecoder.ZERO_COPY); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index dc443b6aa7..ef8e4da03e 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -23,7 +23,6 @@ import io.rsocket.SocketAcceptor; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.netty.server.CloseableChannel; import io.rsocket.transport.netty.server.TcpServerTransport; -import io.rsocket.util.ByteBufPayload; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -37,12 +36,10 @@ import reactor.test.StepVerifier; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.core.codec.StringDecoder; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.rsocket.annotation.ConnectMapping; import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler; import org.springframework.stereotype.Controller; -import org.springframework.util.MimeTypeUtils; /** * Client-side handling of requests initiated from the server side. @@ -112,10 +109,9 @@ public class RSocketServerToClientIntegrationTests { RSocketRequester requester = null; try { requester = RSocketRequester.builder() - .metadataMimeType(MimeTypeUtils.TEXT_PLAIN) + .setupRoute(connectionRoute) .rsocketStrategies(strategies) .rsocketFactory(clientResponderConfigurer) - .rsocketFactory(factory -> factory.setupPayload(ByteBufPayload.create("", connectionRoute))) .connectTcp("localhost", server.address().getPort()) .block(); @@ -266,9 +262,7 @@ public class RSocketServerToClientIntegrationTests { @Bean public RSocketStrategies rsocketStrategies() { - DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(StringDecoder.allMimeTypes()); - extractor.metadataToExtract(MimeTypeUtils.TEXT_PLAIN, String.class, MetadataExtractor.ROUTE_KEY); - return RSocketStrategies.builder().metadataExtractor(extractor).build(); + return RSocketStrategies.create(); } }