diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java index a8f129f5ec..62c462f935 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java @@ -16,12 +16,19 @@ package org.springframework.messaging.rsocket; -import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; import io.rsocket.Payload; import io.rsocket.RSocket; +import io.rsocket.metadata.CompositeMetadataFlyweight; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -32,6 +39,10 @@ import org.springframework.core.ResolvableType; import org.springframework.core.codec.Decoder; import org.springframework.core.codec.Encoder; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.NettyDataBuffer; +import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.MimeType; @@ -44,24 +55,42 @@ import org.springframework.util.MimeType; */ final class DefaultRSocketRequester implements RSocketRequester { + static final MimeType COMPOSITE_METADATA = new MimeType("message", "x.rsocket.composite-metadata.v0"); + + static final MimeType ROUTING = new MimeType("message", "x.rsocket.routing.v0"); + + static final List METADATA_MIME_TYPES = Arrays.asList(COMPOSITE_METADATA, ROUTING); + + private static final Map EMPTY_HINTS = Collections.emptyMap(); private final RSocket rsocket; - @Nullable private final MimeType dataMimeType; + private final MimeType metadataMimeType; + private final RSocketStrategies strategies; - private DataBuffer emptyDataBuffer; + private final DataBuffer emptyDataBuffer; - DefaultRSocketRequester(RSocket rsocket, @Nullable MimeType dataMimeType, RSocketStrategies strategies) { + DefaultRSocketRequester( + RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType, + RSocketStrategies strategies) { + Assert.notNull(rsocket, "RSocket is required"); + Assert.notNull(dataMimeType, "'dataMimeType' is required"); + Assert.notNull(metadataMimeType, "'metadataMimeType' is required"); Assert.notNull(strategies, "RSocketStrategies is required"); + + Assert.isTrue(METADATA_MIME_TYPES.contains(metadataMimeType), + () -> "Unexpected metadatata mime type: '" + metadataMimeType + "'"); + this.rsocket = rsocket; this.dataMimeType = dataMimeType; + this.metadataMimeType = metadataMimeType; this.strategies = strategies; this.emptyDataBuffer = this.strategies.dataBufferFactory().wrap(new byte[0]); } @@ -72,6 +101,16 @@ final class DefaultRSocketRequester implements RSocketRequester { return this.rsocket; } + @Override + public MimeType dataMimeType() { + return this.dataMimeType; + } + + @Override + public MimeType metadataMimeType() { + return this.metadataMimeType; + } + @Override public RequestSpec route(String route) { return new DefaultRequestSpec(route); @@ -82,13 +121,28 @@ final class DefaultRSocketRequester implements RSocketRequester { return (Void.class.equals(elementType.resolve()) || void.class.equals(elementType.resolve())); } + private DataBufferFactory bufferFactory() { + return this.strategies.dataBufferFactory(); + } + private class DefaultRequestSpec implements RequestSpec { - private final String route; + private final Map metadata = new LinkedHashMap<>(4); + + + public DefaultRequestSpec(String route) { + Assert.notNull(route, "'route' is required"); + metadata(route, ROUTING); + } + - DefaultRequestSpec(String route) { - this.route = route; + @Override + public RequestSpec metadata(Object metadata, MimeType mimeType) { + Assert.isTrue(this.metadata.isEmpty() || metadataMimeType().equals(COMPOSITE_METADATA), + "Additional metadata entries supported only with composite metadata"); + this.metadata.put(metadata, mimeType); + return this; } @Override @@ -122,7 +176,7 @@ final class DefaultRSocketRequester implements RSocketRequester { } else { Mono payloadMono = Mono - .fromCallable(() -> encodeValue(input, ResolvableType.forInstance(input), null)) + .fromCallable(() -> encodeData(input, ResolvableType.forInstance(input), null)) .map(this::firstPayload) .doOnDiscard(Payload.class, Payload::release) .switchIfEmpty(emptyPayload()); @@ -139,14 +193,14 @@ final class DefaultRSocketRequester implements RSocketRequester { if (adapter != null && !adapter.isMultiValue()) { Mono payloadMono = Mono.from(publisher) - .map(value -> encodeValue(value, dataType, encoder)) + .map(value -> encodeData(value, dataType, encoder)) .map(this::firstPayload) .switchIfEmpty(emptyPayload()); return new DefaultResponseSpec(payloadMono); } Flux payloadFlux = Flux.from(publisher) - .map(value -> encodeValue(value, dataType, encoder)) + .map(value -> encodeData(value, dataType, encoder)) .switchOnFirst((signal, inner) -> { DataBuffer data = signal.get(); if (data != null) { @@ -163,16 +217,28 @@ final class DefaultRSocketRequester implements RSocketRequester { } @SuppressWarnings("unchecked") - private DataBuffer encodeValue(T value, ResolvableType valueType, @Nullable Encoder encoder) { + private DataBuffer encodeData(T value, ResolvableType valueType, @Nullable Encoder encoder) { + if (value instanceof DataBuffer) { + return (DataBuffer) value; + } if (encoder == null) { - encoder = strategies.encoder(ResolvableType.forInstance(value), dataMimeType); + valueType = ResolvableType.forInstance(value); + encoder = strategies.encoder(valueType, dataMimeType); } return ((Encoder) encoder).encodeValue( - value, strategies.dataBufferFactory(), valueType, dataMimeType, EMPTY_HINTS); + value, bufferFactory(), valueType, dataMimeType, EMPTY_HINTS); } private Payload firstPayload(DataBuffer data) { - return PayloadUtils.createPayload(getMetadata(), data); + DataBuffer metadata; + try { + metadata = getMetadata(); + return PayloadUtils.createPayload(metadata, data); + } + catch (Throwable ex) { + DataBufferUtils.release(data); + throw ex; + } } private Mono emptyPayload() { @@ -180,7 +246,51 @@ final class DefaultRSocketRequester implements RSocketRequester { } private DataBuffer getMetadata() { - return strategies.dataBufferFactory().wrap(this.route.getBytes(StandardCharsets.UTF_8)); + if (metadataMimeType().equals(COMPOSITE_METADATA)) { + CompositeByteBuf metadata = getAllocator().compositeBuffer(); + this.metadata.forEach((key, value) -> { + DataBuffer dataBuffer = encodeMetadata(key, value); + CompositeMetadataFlyweight.encodeAndAddMetadata(metadata, getAllocator(), value.toString(), + dataBuffer instanceof NettyDataBuffer ? + ((NettyDataBuffer) dataBuffer).getNativeBuffer() : + Unpooled.wrappedBuffer(dataBuffer.asByteBuffer())); + + }); + return asDataBuffer(metadata); + } + Assert.isTrue(this.metadata.size() < 2, "Composite metadata required for multiple entries"); + Map.Entry entry = this.metadata.entrySet().iterator().next(); + Assert.isTrue(metadataMimeType().equals(entry.getValue()), + () -> "Expected metadata MimeType '" + metadataMimeType() + "', actual " + this.metadata); + return encodeMetadata(entry.getKey(), entry.getValue()); + } + + @SuppressWarnings("unchecked") + private DataBuffer encodeMetadata(Object metadata, MimeType mimeType) { + if (metadata instanceof DataBuffer) { + return (DataBuffer) metadata; + } + ResolvableType type = ResolvableType.forInstance(metadata); + Encoder encoder = strategies.encoder(type, mimeType); + Assert.notNull(encoder, () -> "No encoder for metadata " + metadata + ", mimeType '" + mimeType + "'"); + return encoder.encodeValue((T) metadata, bufferFactory(), type, mimeType, EMPTY_HINTS); + } + + private ByteBufAllocator getAllocator() { + return bufferFactory() instanceof NettyDataBufferFactory ? + ((NettyDataBufferFactory) bufferFactory()).getByteBufAllocator() : + ByteBufAllocator.DEFAULT; + } + + private DataBuffer asDataBuffer(ByteBuf byteBuf) { + if (bufferFactory() instanceof NettyDataBufferFactory) { + return ((NettyDataBufferFactory) bufferFactory()).wrap(byteBuf); + } + else { + DataBuffer dataBuffer = bufferFactory().wrap(byteBuf.nioBuffer()); + byteBuf.release(); + return dataBuffer; + } } } @@ -259,7 +369,7 @@ final class DefaultRSocketRequester implements RSocketRequester { } private DataBuffer retainDataAndReleasePayload(Payload payload) { - return PayloadUtils.retainDataAndReleasePayload(payload, strategies.dataBufferFactory()); + return PayloadUtils.retainDataAndReleasePayload(payload, bufferFactory()); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java index 7213e65ff3..90286cb2bc 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java @@ -44,6 +44,8 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { @Nullable private MimeType dataMimeType; + private MimeType metadataMimeType = DefaultRSocketRequester.COMPOSITE_METADATA; + private List> factoryConfigurers = new ArrayList<>(); @Nullable @@ -53,11 +55,18 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { @Override - public RSocketRequester.Builder dataMimeType(MimeType mimeType) { + public RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType) { this.dataMimeType = mimeType; return this; } + @Override + public RSocketRequester.Builder metadataMimeType(MimeType mimeType) { + Assert.notNull(mimeType, "`metadataMimeType` is required"); + this.metadataMimeType = mimeType; + return this; + } + @Override public RSocketRequester.Builder rsocketFactory(Consumer configurer) { this.factoryConfigurers.add(configurer); @@ -100,10 +109,13 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { RSocketFactory.ClientRSocketFactory rsocketFactory = RSocketFactory.connect(); MimeType dataMimeType = getDataMimeType(rsocketStrategies); rsocketFactory.dataMimeType(dataMimeType.toString()); + rsocketFactory.metadataMimeType(this.metadataMimeType.toString()); this.factoryConfigurers.forEach(consumer -> consumer.accept(rsocketFactory)); - return rsocketFactory.transport(transport).start() - .map(rsocket -> new DefaultRSocketRequester(rsocket, dataMimeType, rsocketStrategies)); + return rsocketFactory.transport(transport) + .start() + .map(rsocket -> new DefaultRSocketRequester( + rsocket, dataMimeType, this.metadataMimeType, rsocketStrategies)); } private RSocketStrategies getRSocketStrategies() { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java index 185dd078bc..9b7fce4254 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java @@ -24,9 +24,9 @@ import io.rsocket.RSocket; import io.rsocket.SocketAcceptor; import reactor.core.publisher.Mono; -import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; +import org.springframework.util.Assert; import org.springframework.util.MimeType; import org.springframework.util.MimeTypeUtils; import org.springframework.util.StringUtils; @@ -47,16 +47,28 @@ public final class MessageHandlerAcceptor extends RSocketMessageHandler @Nullable private MimeType defaultDataMimeType; + private MimeType defaultMetadataMimeType = DefaultRSocketRequester.COMPOSITE_METADATA; + + + /** + * Configure the default content type to use for data payloads if the + * {@code SETUP} frame did not specify one. + *

By default this is not set. + * @param mimeType the MimeType to use + */ + public void setDefaultDataMimeType(@Nullable MimeType mimeType) { + this.defaultDataMimeType = mimeType; + } /** - * Configure the default content type to use for data payloads. - *

By default this is not set. However a server acceptor will use the - * content type from the {@link ConnectionSetupPayload}, so this is typically - * required for clients but can also be used on servers as a fallback. - * @param defaultDataMimeType the MimeType to use + * Configure the default {@code MimeType} for payload data if the + * {@code SETUP} frame did not specify one. + *

By default this is set to {@code "message/x.rsocket.composite-metadata.v0"} + * @param mimeType the MimeType to use */ - public void setDefaultDataMimeType(@Nullable MimeType defaultDataMimeType) { - this.defaultDataMimeType = defaultDataMimeType; + public void setDefaultMetadataMimeType(MimeType mimeType) { + Assert.notNull(mimeType, "'metadataMimeType' is required"); + this.defaultMetadataMimeType = mimeType; } @@ -76,12 +88,24 @@ public final class MessageHandlerAcceptor extends RSocketMessageHandler } private MessagingRSocket createRSocket(ConnectionSetupPayload setupPayload, RSocket rsocket) { + MimeType dataMimeType = StringUtils.hasText(setupPayload.dataMimeType()) ? MimeTypeUtils.parseMimeType(setupPayload.dataMimeType()) : this.defaultDataMimeType; - RSocketRequester requester = RSocketRequester.wrap(rsocket, dataMimeType, getRSocketStrategies()); - DataBufferFactory bufferFactory = getRSocketStrategies().dataBufferFactory(); - return new MessagingRSocket(this, getRouteMatcher(), requester, dataMimeType, bufferFactory); + Assert.notNull(dataMimeType, + "No `dataMimeType` in the ConnectionSetupPayload and no default value"); + + MimeType metadataMimeType = StringUtils.hasText(setupPayload.dataMimeType()) ? + MimeTypeUtils.parseMimeType(setupPayload.metadataMimeType()) : + this.defaultMetadataMimeType; + Assert.notNull(dataMimeType, + "No `metadataMimeType` in the ConnectionSetupPayload and no default value"); + + RSocketRequester requester = RSocketRequester.wrap( + rsocket, dataMimeType, metadataMimeType, getRSocketStrategies()); + + return new MessagingRSocket(this, getRouteMatcher(), requester, + dataMimeType, metadataMimeType, getRSocketStrategies().dataBufferFactory()); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java index 583839906a..973ff263b6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java @@ -16,6 +16,7 @@ package org.springframework.messaging.rsocket; +import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -23,6 +24,7 @@ import io.rsocket.AbstractRSocket; import io.rsocket.ConnectionSetupPayload; import io.rsocket.Payload; import io.rsocket.RSocket; +import io.rsocket.metadata.CompositeMetadata; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -61,24 +63,31 @@ class MessagingRSocket extends AbstractRSocket { private final RSocketRequester requester; - @Nullable - private MimeType dataMimeType; + private final MimeType dataMimeType; + + private final MimeType metadataMimeType; private final DataBufferFactory bufferFactory; MessagingRSocket(RSocketMessageHandler messageHandler, RouteMatcher routeMatcher, - RSocketRequester requester, @Nullable MimeType defaultDataMimeType, + RSocketRequester requester, MimeType dataMimeType, MimeType metadataMimeType, DataBufferFactory bufferFactory) { Assert.notNull(messageHandler, "'messageHandler' is required"); Assert.notNull(routeMatcher, "'routeMatcher' is required"); Assert.notNull(requester, "'requester' is required"); + Assert.notNull(requester, "'dataMimeType' is required"); + Assert.notNull(requester, "'metadataMimeType' is required"); + + Assert.isTrue(DefaultRSocketRequester.METADATA_MIME_TYPES.contains(metadataMimeType), + () -> "Unexpected metadatata mime type: '" + metadataMimeType + "'"); this.messageHandler = messageHandler; this.routeMatcher = routeMatcher; this.requester = requester; - this.dataMimeType = defaultDataMimeType; + this.dataMimeType = dataMimeType; + this.metadataMimeType = metadataMimeType; this.bufferFactory = bufferFactory; } @@ -169,13 +178,21 @@ class MessagingRSocket extends AbstractRSocket { } private String getDestination(Payload payload) { - - // TODO: - // For now treat the metadata as a simple string with routing information. - // We'll have to get more sophisticated once the routing extension is completed. - // https://github.com/rsocket/rsocket-java/issues/568 - - return payload.getMetadataUtf8(); + if (this.metadataMimeType.equals(DefaultRSocketRequester.COMPOSITE_METADATA)) { + CompositeMetadata metadata = new CompositeMetadata(payload.metadata(), false); + for (CompositeMetadata.Entry entry : metadata) { + String mimeType = entry.getMimeType(); + if (DefaultRSocketRequester.ROUTING.toString().equals(mimeType)) { + return entry.getContent().toString(StandardCharsets.UTF_8); + } + } + return ""; + } + else if (this.metadataMimeType.equals(DefaultRSocketRequester.ROUTING)) { + return payload.getMetadataUtf8(); + } + // Should not happen (given constructor assertions) + throw new IllegalArgumentException("Unexpected metadata MimeType"); } private DataBuffer retainDataAndReleasePayload(Payload payload) { @@ -187,9 +204,7 @@ class MessagingRSocket extends AbstractRSocket { headers.setLeaveMutable(true); RouteMatcher.Route route = this.routeMatcher.parseRoute(destination); headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, route); - if (this.dataMimeType != null) { - headers.setContentType(this.dataMimeType); - } + headers.setContentType(this.dataMimeType); headers.setHeader(RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER, this.requester); if (replyMono != null) { headers.setHeader(RSocketPayloadReturnValueHandler.RESPONSE_HEADER, replyMono); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index f64e1d751e..9689995778 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -19,6 +19,7 @@ package org.springframework.messaging.rsocket; import java.net.URI; import java.util.function.Consumer; +import io.rsocket.ConnectionSetupPayload; import io.rsocket.RSocket; import io.rsocket.RSocketFactory; import io.rsocket.transport.ClientTransport; @@ -47,15 +48,28 @@ public interface RSocketRequester { */ RSocket rsocket(); - // For now we treat metadata as a simple string that is the route. - // This will change after the resolution of: - // https://github.com/rsocket/rsocket-java/issues/568 + /** + * Return the data {@code MimeType} selected for the underlying RSocket + * at connection time. On the client side this is configured via + * {@link RSocketRequester.Builder#dataMimeType(MimeType)} while on the + * server side it's obtained from the {@link ConnectionSetupPayload}. + */ + MimeType dataMimeType(); /** - * Entry point to prepare a new request to the given route. - *

For requestChannel interactions, i.e. Flux-to-Flux the metadata is - * attached to the first request payload. - * @param route the routing destination + * Return the metadata {@code MimeType} selected for the underlying RSocket + * at connection time. On the client side this is configured via + * {@link RSocketRequester.Builder#metadataMimeType(MimeType)} while on the + * server side it's obtained from the {@link ConnectionSetupPayload}. + */ + MimeType metadataMimeType(); + + + /** + * Begin to specify a new request with the given route to a handler on the + * remote side. The route will be encoded in the metadata of the first + * payload. + * @param route the route to a handler * @return a spec for further defining and executing the request */ RequestSpec route(String route); @@ -72,31 +86,19 @@ public interface RSocketRequester { } /** - * Wrap an existing {@link RSocket}. Typically used in a client or server - * responder to wrap the remote {@code RSocket}. + * Wrap an existing {@link RSocket}. This is typically used in a responder, + * client or server, to wrap the remote/sending {@code RSocket}. * @param rsocket the RSocket to wrap - * @param dataMimeType the data MimeType, obtained from the - * {@link io.rsocket.ConnectionSetupPayload} (server) or the - * {@link io.rsocket.RSocketFactory.ClientRSocketFactory} (client) + * @param dataMimeType the data MimeType from the {@code ConnectionSetupPayload} + * @param metadataMimeType the metadata MimeType from the {@code ConnectionSetupPayload} * @param strategies the strategies to use * @return the created RSocketRequester */ - static RSocketRequester wrap(RSocket rsocket, @Nullable MimeType dataMimeType, RSocketStrategies strategies) { - return new DefaultRSocketRequester(rsocket, dataMimeType, strategies); - } + static RSocketRequester wrap( + RSocket rsocket, MimeType dataMimeType, MimeType metadataMimeType, + RSocketStrategies strategies) { - /** - * Create a new {@code RSocketRequester} from the given {@link RSocket} and - * strategies for encoding and decoding request and response payloads. - * @param rsocket the sending RSocket to use - * @param dataMimeType the MimeType for data (from the SETUP frame) - * @param strategies encoders, decoders, and others - * @return the created RSocketRequester wrapper - * @deprecated use {@link #wrap(RSocket, MimeType, RSocketStrategies)} instead - */ - @Deprecated - static RSocketRequester create(RSocket rsocket, @Nullable MimeType dataMimeType, RSocketStrategies strategies) { - return new DefaultRSocketRequester(rsocket, dataMimeType, strategies); + return new DefaultRSocketRequester(rsocket, dataMimeType, metadataMimeType, strategies); } @@ -107,20 +109,37 @@ public interface RSocketRequester { interface Builder { /** - * Configure the MimeType to use for payload data. This is set on the - * {@code SETUP} frame for the whole connection. + * Configure the MimeType to use for payload data. This is then + * specified on the {@code SETUP} frame for the whole connection. *

By default this is set to the first concrete MimeType supported * by the configured encoders and decoders. * @param mimeType the data MimeType to use */ - RSocketRequester.Builder dataMimeType(MimeType mimeType); + RSocketRequester.Builder dataMimeType(@Nullable MimeType mimeType); + + /** + * Configure the MimeType to use for payload metadata. This is then + * specified on the {@code SETUP} frame for the whole connection. + *

At present the metadata MimeType must be + * {@code "message/x.rsocket.routing.v0"} to allow the request + * {@link RSocketRequester#route(String) route} to be encoded, or it + * could also be {@code "message/x.rsocket.composite-metadata.v0"} in + * which case the route can be encoded along with other metadata entries. + *

By default this is set to + * {@code "message/x.rsocket.composite-metadata.v0"}. + * @param mimeType the data MimeType to use + */ + RSocketRequester.Builder metadataMimeType(MimeType mimeType); /** * Configure the {@code ClientRSocketFactory}. - *

Note: Please, do not set the {@code dataMimeType} - * directly on the underlying {@code RSocketFactory.ClientRSocketFactory}, - * and use {@link #dataMimeType(MimeType)} instead. - * @param configurer the configurer to apply + *

Note: This builder provides shortcuts for certain + * {@code ClientRSocketFactory} options it needs to know about such as + * {@link #dataMimeType(MimeType)} and {@link #metadataMimeType(MimeType)}. + * Please, use these shortcuts vs configuring them directly on the + * {@code ClientRSocketFactory} so that the resulting + * {@code RSocketRequester} is aware of those changes. + * @param configurer consumer to customize the factory */ RSocketRequester.Builder rsocketFactory(Consumer configurer); @@ -169,6 +188,17 @@ public interface RSocketRequester { */ interface RequestSpec { + /** + * Use this to append additional metadata entries if the RSocket + * connection is configured to use composite metadata. If not, an + * {@link IllegalArgumentException} will be raised. + * @param metadata an Object, to be encoded with a suitable + * {@link org.springframework.core.codec.Encoder Encoder}, or a + * {@link org.springframework.core.io.buffer.DataBuffer DataBuffer} + * @param mimeType the mime type that describes the metadata + */ + RequestSpec metadata(Object metadata, MimeType mimeType); + /** * Provide request payload data. The given Object may be a synchronous * value, or a {@link Publisher} of values, or another async type that's diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java index 56eebe53df..915499aba6 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterTests.java @@ -19,6 +19,7 @@ package org.springframework.messaging.rsocket; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; @@ -28,6 +29,7 @@ import io.reactivex.Observable; import io.reactivex.Single; import io.rsocket.AbstractRSocket; import io.rsocket.Payload; +import io.rsocket.metadata.CompositeMetadata; import org.junit.Before; import org.junit.Test; import org.reactivestreams.Publisher; @@ -61,37 +63,41 @@ public class DefaultRSocketRequesterTests { private RSocketRequester requester; + private RSocketStrategies strategies; + private final DefaultDataBufferFactory bufferFactory = new DefaultDataBufferFactory(); @Before public void setUp() { - RSocketStrategies strategies = RSocketStrategies.builder() + this.strategies = RSocketStrategies.builder() .decoder(StringDecoder.allMimeTypes()) .encoder(CharSequenceEncoder.allMimeTypes()) .build(); this.rsocket = new TestRSocket(); - this.requester = RSocketRequester.wrap(this.rsocket, MimeTypeUtils.TEXT_PLAIN, strategies); + this.requester = RSocketRequester.wrap(this.rsocket, + MimeTypeUtils.TEXT_PLAIN, DefaultRSocketRequester.ROUTING, + this.strategies); } @Test - public void singlePayload() { + public void sendMono() { // data(Object) - testSinglePayload(spec -> spec.data("bodyA"), "bodyA"); - testSinglePayload(spec -> spec.data(Mono.delay(MILLIS_10).map(l -> "bodyA")), "bodyA"); - testSinglePayload(spec -> spec.data(Mono.delay(MILLIS_10).then()), ""); - testSinglePayload(spec -> spec.data(Single.timer(10, MILLISECONDS).map(l -> "bodyA")), "bodyA"); - testSinglePayload(spec -> spec.data(Completable.complete()), ""); + testSendMono(spec -> spec.data("bodyA"), "bodyA"); + testSendMono(spec -> spec.data(Mono.delay(MILLIS_10).map(l -> "bodyA")), "bodyA"); + testSendMono(spec -> spec.data(Mono.delay(MILLIS_10).then()), ""); + testSendMono(spec -> spec.data(Single.timer(10, MILLISECONDS).map(l -> "bodyA")), "bodyA"); + testSendMono(spec -> spec.data(Completable.complete()), ""); // data(Publisher, Class) - testSinglePayload(spec -> spec.data(Mono.delay(MILLIS_10).map(l -> "bodyA"), String.class), "bodyA"); - testSinglePayload(spec -> spec.data(Mono.delay(MILLIS_10).map(l -> "bodyA"), Object.class), "bodyA"); - testSinglePayload(spec -> spec.data(Mono.delay(MILLIS_10).then(), Void.class), ""); + testSendMono(spec -> spec.data(Mono.delay(MILLIS_10).map(l -> "bodyA"), String.class), "bodyA"); + testSendMono(spec -> spec.data(Mono.delay(MILLIS_10).map(l -> "bodyA"), Object.class), "bodyA"); + testSendMono(spec -> spec.data(Mono.delay(MILLIS_10).then(), Void.class), ""); } - private void testSinglePayload(Function mapper, String expectedValue) { + private void testSendMono(Function mapper, String expectedValue) { mapper.apply(this.requester.route("toA")).send().block(Duration.ofSeconds(5)); assertThat(this.rsocket.getSavedMethodName()).isEqualTo("fireAndForget"); @@ -100,22 +106,22 @@ public class DefaultRSocketRequesterTests { } @Test - public void multiPayload() { + public void sendFlux() { String[] values = new String[] {"bodyA", "bodyB", "bodyC"}; Flux stringFlux = Flux.fromArray(values).delayElements(MILLIS_10); // data(Object) - testMultiPayload(spec -> spec.data(stringFlux), values); - testMultiPayload(spec -> spec.data(Flux.empty()), ""); - testMultiPayload(spec -> spec.data(Observable.fromArray(values).delay(10, MILLISECONDS)), values); - testMultiPayload(spec -> spec.data(Observable.empty()), ""); + testSendFlux(spec -> spec.data(stringFlux), values); + testSendFlux(spec -> spec.data(Flux.empty()), ""); + testSendFlux(spec -> spec.data(Observable.fromArray(values).delay(10, MILLISECONDS)), values); + testSendFlux(spec -> spec.data(Observable.empty()), ""); // data(Publisher, Class) - testMultiPayload(spec -> spec.data(stringFlux, String.class), values); - testMultiPayload(spec -> spec.data(stringFlux.cast(Object.class), Object.class), values); + testSendFlux(spec -> spec.data(stringFlux, String.class), values); + testSendFlux(spec -> spec.data(stringFlux.cast(Object.class), Object.class), values); } - private void testMultiPayload(Function mapper, String... expectedValues) { + private void testSendFlux(Function mapper, String... expectedValues) { this.rsocket.reset(); mapper.apply(this.requester.route("toA")).retrieveFlux(String.class).blockLast(Duration.ofSeconds(5)); @@ -129,19 +135,50 @@ public class DefaultRSocketRequesterTests { assertThat(payloads.get(0).getDataUtf8()).isEqualTo(""); } else { - assertThat(payloads.stream().map(Payload::getMetadataUtf8).toArray(String[]::new)).isEqualTo(new String[] {"toA", "", ""}); - assertThat(payloads.stream().map(Payload::getDataUtf8).toArray(String[]::new)).isEqualTo(expectedValues); + assertThat(payloads.stream().map(Payload::getMetadataUtf8).toArray(String[]::new)) + .isEqualTo(new String[] {"toA", "", ""}); + assertThat(payloads.stream().map(Payload::getDataUtf8).toArray(String[]::new)) + .isEqualTo(expectedValues); } } @Test - public void send() { - String value = "bodyA"; - this.requester.route("toA").data(value).send().block(Duration.ofSeconds(5)); + public void sendCompositeMetadata() { + RSocketRequester requester = RSocketRequester.wrap(this.rsocket, + MimeTypeUtils.TEXT_PLAIN, DefaultRSocketRequester.COMPOSITE_METADATA, + this.strategies); + + requester.route("toA") + .metadata("My metadata", MimeTypeUtils.TEXT_PLAIN).data("bodyA") + .send() + .block(Duration.ofSeconds(5)); + + CompositeMetadata entries = new CompositeMetadata(this.rsocket.getSavedPayload().metadata(), false); + Iterator iterator = entries.iterator(); + + assertThat(iterator.hasNext()).isTrue(); + CompositeMetadata.Entry entry = iterator.next(); + assertThat(entry.getMimeType()).isEqualTo(DefaultRSocketRequester.ROUTING.toString()); + assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("toA"); + + assertThat(iterator.hasNext()).isTrue(); + entry = iterator.next(); + assertThat(entry.getMimeType()).isEqualTo(MimeTypeUtils.TEXT_PLAIN.toString()); + assertThat(entry.getContent().toString(StandardCharsets.UTF_8)).isEqualTo("My metadata"); + + assertThat(iterator.hasNext()).isFalse(); + } - assertThat(this.rsocket.getSavedMethodName()).isEqualTo("fireAndForget"); - assertThat(this.rsocket.getSavedPayload().getMetadataUtf8()).isEqualTo("toA"); - assertThat(this.rsocket.getSavedPayload().getDataUtf8()).isEqualTo("bodyA"); + @Test + public void supportedMetadataMimeTypes() { + RSocketRequester.wrap(this.rsocket, MimeTypeUtils.TEXT_PLAIN, + DefaultRSocketRequester.COMPOSITE_METADATA, this.strategies); + + RSocketRequester.wrap(this.rsocket, MimeTypeUtils.TEXT_PLAIN, + DefaultRSocketRequester.ROUTING, this.strategies); + + assertThatIllegalArgumentException().isThrownBy(() -> RSocketRequester.wrap( + this.rsocket, MimeTypeUtils.TEXT_PLAIN, MimeTypeUtils.TEXT_PLAIN, this.strategies)); } @Test @@ -188,10 +225,10 @@ public class DefaultRSocketRequesterTests { } @Test - public void rejectFluxToMono() { - assertThatIllegalArgumentException().isThrownBy(() -> - this.requester.route("").data(Flux.just("a", "b")).retrieveMono(String.class)) - .withMessage("No RSocket interaction model for Flux request to Mono response."); + public void fluxToMonoIsRejected() { + assertThatIllegalArgumentException() + .isThrownBy(() -> this.requester.route("").data(Flux.just("a", "b")).retrieveMono(String.class)) + .withMessage("No RSocket interaction model for Flux request to Mono response."); } private Payload toPayload(String value) { diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java index b340f50b89..ef1bd615c8 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketClientToServerIntegrationTests.java @@ -101,7 +101,9 @@ public class RSocketClientToServerIntegrationTests { .verify(Duration.ofSeconds(5)); assertThat(interceptor.getRSocketCount()).isEqualTo(1); - assertThat(interceptor.getFireAndForgetCount(0)).as("Fire and forget requests did not actually complete handling on the server side").isEqualTo(3); + assertThat(interceptor.getFireAndForgetCount(0)) + .as("Fire and forget requests did not actually complete handling on the server side") + .isEqualTo(3); } @Test diff --git a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java index 152e4e8974..6e07cf5752 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/rsocket/RSocketServerToClientIntegrationTests.java @@ -106,8 +106,9 @@ public class RSocketServerToClientIntegrationTests { RSocket rsocket = null; try { rsocket = RSocketFactory.connect() - .setupPayload(DefaultPayload.create("", destination)) + .metadataMimeType("message/x.rsocket.routing.v0") .dataMimeType("text/plain") + .setupPayload(DefaultPayload.create("", destination)) .frameDecoder(PayloadDecoder.ZERO_COPY) .acceptor(context.getBean("clientAcceptor", MessageHandlerAcceptor.class)) .transport(TcpClientTransport.create("localhost", 7000)) diff --git a/spring-web/src/test/java/org/springframework/http/ContentDispositionTests.java b/spring-web/src/test/java/org/springframework/http/ContentDispositionTests.java index cef4b674f4..4654aeb118 100644 --- a/spring-web/src/test/java/org/springframework/http/ContentDispositionTests.java +++ b/spring-web/src/test/java/org/springframework/http/ContentDispositionTests.java @@ -36,6 +36,15 @@ import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException */ public class ContentDispositionTests { + + @Test + public void parseTest() { + ContentDisposition disposition = ContentDisposition + .parse("form-data; name=\"foo\"; filename=\"foo.txt\"; size=123"); + assertThat(disposition).isEqualTo(ContentDisposition.builder("form-data") + .name("foo").filename("foo.txt").size(123L).build()); + } + @Test public void parse() { ContentDisposition disposition = ContentDisposition