|
|
@ -44,18 +44,20 @@ import org.springframework.util.MimeType; |
|
|
|
import org.springframework.util.RouteMatcher; |
|
|
|
import org.springframework.util.RouteMatcher; |
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
/** |
|
|
|
* Implementation of {@link RSocket} that wraps incoming requests with a |
|
|
|
* Responder {@link RSocket} that wraps the payload and metadata of incoming |
|
|
|
* {@link Message}, delegates to a {@link Function} for handling, and then |
|
|
|
* requests as a {@link Message} and then delegates to the configured |
|
|
|
* obtains the response from a "reply" header. |
|
|
|
* {@link RSocketMessageHandler} to handle it. The response, if applicable, is |
|
|
|
|
|
|
|
* obtained from the {@link RSocketPayloadReturnValueHandler#RESPONSE_HEADER |
|
|
|
|
|
|
|
* "rsocketResponse"} header. |
|
|
|
* |
|
|
|
* |
|
|
|
* @author Rossen Stoyanchev |
|
|
|
* @author Rossen Stoyanchev |
|
|
|
* @since 5.2 |
|
|
|
* @since 5.2 |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
class MessagingRSocket extends AbstractRSocket { |
|
|
|
class MessagingRSocket extends AbstractRSocket { |
|
|
|
|
|
|
|
|
|
|
|
private final Function<Message<?>, Mono<Void>> handler; |
|
|
|
private final RSocketMessageHandler messageHandler; |
|
|
|
|
|
|
|
|
|
|
|
private final Function<String, RouteMatcher.Route> routeParser; |
|
|
|
private final RouteMatcher routeMatcher; |
|
|
|
|
|
|
|
|
|
|
|
private final RSocketRequester requester; |
|
|
|
private final RSocketRequester requester; |
|
|
|
|
|
|
|
|
|
|
@ -65,16 +67,16 @@ class MessagingRSocket extends AbstractRSocket { |
|
|
|
private final DataBufferFactory bufferFactory; |
|
|
|
private final DataBufferFactory bufferFactory; |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
MessagingRSocket(Function<Message<?>, Mono<Void>> handler, |
|
|
|
MessagingRSocket(RSocketMessageHandler messageHandler, RouteMatcher routeMatcher, |
|
|
|
Function<String, RouteMatcher.Route> routeParser, RSocketRequester requester, |
|
|
|
RSocketRequester requester, @Nullable MimeType defaultDataMimeType, |
|
|
|
@Nullable MimeType defaultDataMimeType, DataBufferFactory bufferFactory) { |
|
|
|
DataBufferFactory bufferFactory) { |
|
|
|
|
|
|
|
|
|
|
|
this.routeParser = routeParser; |
|
|
|
Assert.notNull(messageHandler, "'messageHandler' is required"); |
|
|
|
|
|
|
|
Assert.notNull(routeMatcher, "'routeMatcher' is required"); |
|
|
|
Assert.notNull(handler, "'handler' is required"); |
|
|
|
|
|
|
|
Assert.notNull(routeParser, "'routeParser' is required"); |
|
|
|
|
|
|
|
Assert.notNull(requester, "'requester' is required"); |
|
|
|
Assert.notNull(requester, "'requester' is required"); |
|
|
|
this.handler = handler; |
|
|
|
|
|
|
|
|
|
|
|
this.messageHandler = messageHandler; |
|
|
|
|
|
|
|
this.routeMatcher = routeMatcher; |
|
|
|
this.requester = requester; |
|
|
|
this.requester = requester; |
|
|
|
this.dataMimeType = defaultDataMimeType; |
|
|
|
this.dataMimeType = defaultDataMimeType; |
|
|
|
this.bufferFactory = bufferFactory; |
|
|
|
this.bufferFactory = bufferFactory; |
|
|
@ -132,7 +134,7 @@ class MessagingRSocket extends AbstractRSocket { |
|
|
|
DataBuffer dataBuffer = retainDataAndReleasePayload(payload); |
|
|
|
DataBuffer dataBuffer = retainDataAndReleasePayload(payload); |
|
|
|
int refCount = refCount(dataBuffer); |
|
|
|
int refCount = refCount(dataBuffer); |
|
|
|
Message<?> message = MessageBuilder.createMessage(dataBuffer, headers); |
|
|
|
Message<?> message = MessageBuilder.createMessage(dataBuffer, headers); |
|
|
|
return Mono.defer(() -> this.handler.apply(message)) |
|
|
|
return Mono.defer(() -> this.messageHandler.handleMessage(message)) |
|
|
|
.doFinally(s -> { |
|
|
|
.doFinally(s -> { |
|
|
|
if (refCount(dataBuffer) == refCount) { |
|
|
|
if (refCount(dataBuffer) == refCount) { |
|
|
|
DataBufferUtils.release(dataBuffer); |
|
|
|
DataBufferUtils.release(dataBuffer); |
|
|
@ -154,7 +156,7 @@ class MessagingRSocket extends AbstractRSocket { |
|
|
|
Flux<DataBuffer> buffers = payloads.map(this::retainDataAndReleasePayload).doOnSubscribe(s -> read.set(true)); |
|
|
|
Flux<DataBuffer> buffers = payloads.map(this::retainDataAndReleasePayload).doOnSubscribe(s -> read.set(true)); |
|
|
|
Message<Flux<DataBuffer>> message = MessageBuilder.createMessage(buffers, headers); |
|
|
|
Message<Flux<DataBuffer>> message = MessageBuilder.createMessage(buffers, headers); |
|
|
|
|
|
|
|
|
|
|
|
return Mono.defer(() -> this.handler.apply(message)) |
|
|
|
return Mono.defer(() -> this.messageHandler.handleMessage(message)) |
|
|
|
.doFinally(s -> { |
|
|
|
.doFinally(s -> { |
|
|
|
// Subscription should have happened by now due to ChannelSendOperator
|
|
|
|
// Subscription should have happened by now due to ChannelSendOperator
|
|
|
|
if (!read.get()) { |
|
|
|
if (!read.get()) { |
|
|
@ -183,7 +185,7 @@ class MessagingRSocket extends AbstractRSocket { |
|
|
|
private MessageHeaders createHeaders(String destination, @Nullable MonoProcessor<?> replyMono) { |
|
|
|
private MessageHeaders createHeaders(String destination, @Nullable MonoProcessor<?> replyMono) { |
|
|
|
MessageHeaderAccessor headers = new MessageHeaderAccessor(); |
|
|
|
MessageHeaderAccessor headers = new MessageHeaderAccessor(); |
|
|
|
headers.setLeaveMutable(true); |
|
|
|
headers.setLeaveMutable(true); |
|
|
|
RouteMatcher.Route route = this.routeParser.apply(destination); |
|
|
|
RouteMatcher.Route route = this.routeMatcher.parseRoute(destination); |
|
|
|
headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, route); |
|
|
|
headers.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, route); |
|
|
|
if (this.dataMimeType != null) { |
|
|
|
if (this.dataMimeType != null) { |
|
|
|
headers.setContentType(this.dataMimeType); |
|
|
|
headers.setContentType(this.dataMimeType); |
|
|
|