diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java index 1ae547a57d..5156db03e5 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequesterBuilder.java @@ -34,6 +34,7 @@ import org.springframework.util.MimeType; * Default implementation of {@link RSocketRequester.Builder}. * * @author Brian Clozel + * @author Rossen Stoyanchev * @since 5.2 */ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { @@ -76,29 +77,36 @@ final class DefaultRSocketRequesterBuilder implements RSocketRequester.Builder { @Override public Mono connect(ClientTransport transport) { - return Mono.defer(() -> { - RSocketStrategies strategies = getRSocketStrategies(); - MimeType dataMimeType = getDefaultDataMimeType(strategies); - - RSocketFactory.ClientRSocketFactory factory = RSocketFactory.connect(); - if (dataMimeType != null) { - factory.dataMimeType(dataMimeType.toString()); - } - this.factoryConfigurers.forEach(configurer -> configurer.accept(factory)); - - return factory.transport(transport).start() - .map(rsocket -> new DefaultRSocketRequester(rsocket, dataMimeType, strategies)); - }); + return Mono.defer(() -> doConnect(transport)); + } + + private Mono doConnect(ClientTransport transport) { + RSocketStrategies rsocketStrategies = getRSocketStrategies(); + RSocketFactory.ClientRSocketFactory rsocketFactory = RSocketFactory.connect(); + + // 1. Apply default settings + MimeType dataMimeType = getDefaultDataMimeType(rsocketStrategies); + if (dataMimeType != null) { + rsocketFactory.dataMimeType(dataMimeType.toString()); + } + + // 2. Application customizations + this.factoryConfigurers.forEach(c -> c.accept(rsocketFactory)); + + return rsocketFactory.transport(transport).start() + .map(rsocket -> new DefaultRSocketRequester(rsocket, dataMimeType, rsocketStrategies)); } private RSocketStrategies getRSocketStrategies() { - if (this.strategiesConfigurers.isEmpty()) { + if (!this.strategiesConfigurers.isEmpty()) { + RSocketStrategies.Builder builder = + this.strategies != null ? this.strategies.mutate() : RSocketStrategies.builder(); + this.strategiesConfigurers.forEach(c -> c.accept(builder)); + return builder.build(); + } + else { return this.strategies != null ? this.strategies : RSocketStrategies.builder().build(); } - RSocketStrategies.Builder strategiesBuilder = this.strategies != null ? - this.strategies.mutate() : RSocketStrategies.builder(); - this.strategiesConfigurers.forEach(configurer -> configurer.accept(strategiesBuilder)); - return strategiesBuilder.build(); } @Nullable diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java index 97e3ce0f1e..185dd078bc 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessageHandlerAcceptor.java @@ -24,6 +24,7 @@ import io.rsocket.RSocket; import io.rsocket.SocketAcceptor; import reactor.core.publisher.Mono; +import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.util.MimeType; @@ -78,11 +79,9 @@ public final class MessageHandlerAcceptor extends RSocketMessageHandler MimeType dataMimeType = StringUtils.hasText(setupPayload.dataMimeType()) ? MimeTypeUtils.parseMimeType(setupPayload.dataMimeType()) : this.defaultDataMimeType; - return new MessagingRSocket(this::handleMessage, - route -> getRouteMatcher().parseRoute(route), - RSocketRequester.wrap(rsocket, dataMimeType, getRSocketStrategies()), - dataMimeType, - getRSocketStrategies().dataBufferFactory()); + RSocketRequester requester = RSocketRequester.wrap(rsocket, dataMimeType, getRSocketStrategies()); + DataBufferFactory bufferFactory = getRSocketStrategies().dataBufferFactory(); + return new MessagingRSocket(this, getRouteMatcher(), requester, dataMimeType, bufferFactory); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java index ba27d6c6ea..583839906a 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/MessagingRSocket.java @@ -44,18 +44,20 @@ import org.springframework.util.MimeType; import org.springframework.util.RouteMatcher; /** - * Implementation of {@link RSocket} that wraps incoming requests with a - * {@link Message}, delegates to a {@link Function} for handling, and then - * obtains the response from a "reply" header. + * Responder {@link RSocket} that wraps the payload and metadata of incoming + * requests as a {@link Message} and then delegates to the configured + * {@link RSocketMessageHandler} to handle it. The response, if applicable, is + * obtained from the {@link RSocketPayloadReturnValueHandler#RESPONSE_HEADER + * "rsocketResponse"} header. * * @author Rossen Stoyanchev * @since 5.2 */ class MessagingRSocket extends AbstractRSocket { - private final Function, Mono> handler; + private final RSocketMessageHandler messageHandler; - private final Function routeParser; + private final RouteMatcher routeMatcher; private final RSocketRequester requester; @@ -65,16 +67,16 @@ class MessagingRSocket extends AbstractRSocket { private final DataBufferFactory bufferFactory; - MessagingRSocket(Function, Mono> handler, - Function routeParser, RSocketRequester requester, - @Nullable MimeType defaultDataMimeType, DataBufferFactory bufferFactory) { + MessagingRSocket(RSocketMessageHandler messageHandler, RouteMatcher routeMatcher, + RSocketRequester requester, @Nullable MimeType defaultDataMimeType, + DataBufferFactory bufferFactory) { - this.routeParser = routeParser; - - Assert.notNull(handler, "'handler' is required"); - Assert.notNull(routeParser, "'routeParser' is required"); + Assert.notNull(messageHandler, "'messageHandler' is required"); + Assert.notNull(routeMatcher, "'routeMatcher' is required"); Assert.notNull(requester, "'requester' is required"); - this.handler = handler; + + this.messageHandler = messageHandler; + this.routeMatcher = routeMatcher; this.requester = requester; this.dataMimeType = defaultDataMimeType; this.bufferFactory = bufferFactory; @@ -132,7 +134,7 @@ class MessagingRSocket extends AbstractRSocket { DataBuffer dataBuffer = retainDataAndReleasePayload(payload); int refCount = refCount(dataBuffer); Message message = MessageBuilder.createMessage(dataBuffer, headers); - return Mono.defer(() -> this.handler.apply(message)) + return Mono.defer(() -> this.messageHandler.handleMessage(message)) .doFinally(s -> { if (refCount(dataBuffer) == refCount) { DataBufferUtils.release(dataBuffer); @@ -154,7 +156,7 @@ class MessagingRSocket extends AbstractRSocket { Flux buffers = payloads.map(this::retainDataAndReleasePayload).doOnSubscribe(s -> read.set(true)); Message> message = MessageBuilder.createMessage(buffers, headers); - return Mono.defer(() -> this.handler.apply(message)) + return Mono.defer(() -> this.messageHandler.handleMessage(message)) .doFinally(s -> { // Subscription should have happened by now due to ChannelSendOperator if (!read.get()) { @@ -183,7 +185,7 @@ class MessagingRSocket extends AbstractRSocket { private MessageHeaders createHeaders(String destination, @Nullable MonoProcessor replyMono) { MessageHeaderAccessor headers = new MessageHeaderAccessor(); headers.setLeaveMutable(true); - RouteMatcher.Route route = this.routeParser.apply(destination); + RouteMatcher.Route route = this.routeMatcher.parseRoute(destination); headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, route); if (this.dataMimeType != null) { headers.setContentType(this.dataMimeType); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java index 4c6f0361ca..0b8c64dbf3 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketRequester.java @@ -116,8 +116,8 @@ public interface RSocketRequester { RSocketRequester.Builder rsocketFactory(Consumer configurer); /** - * Set the {@link RSocketStrategies} instance. - * @param strategies the strategies to use + * Set the {@link RSocketStrategies} to use for access to encoders, + * decoders, and a factory for {@code DataBuffer's}. */ RSocketRequester.Builder rsocketStrategies(@Nullable RSocketStrategies strategies); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java index 9ef05661b7..9e665a3bc7 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/RSocketStrategies.java @@ -172,7 +172,7 @@ public interface RSocketStrategies { Builder dataBufferFactory(DataBufferFactory bufferFactory); /** - * Builder the {@code RSocketStrategies} instance. + * Build the {@code RSocketStrategies} instance. */ RSocketStrategies build(); }