diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolver.java index 88a1a91298..62fa3af05e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolver.java @@ -232,26 +232,26 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol if (decoder.canDecode(elementType, mimeType)) { if (adapter != null && adapter.isMultiValue()) { Flux flux = content - .concatMap(buffer -> decoder.decode(Mono.just(buffer), elementType, mimeType, hints)) + .map(buffer -> decoder.decode(buffer, elementType, mimeType, hints)) .onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex))); if (isContentRequired) { flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message))); } if (validator != null) { - flux = flux.doOnNext(validator::accept); + flux = flux.doOnNext(validator); } return Mono.just(adapter.fromPublisher(flux)); } else { // Single-value (with or without reactive type wrapper) - Mono mono = decoder - .decodeToMono(content.next(), targetType, mimeType, hints) + Mono mono = content.next() + .map(buffer -> decoder.decode(buffer, elementType, mimeType, hints)) .onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex))); if (isContentRequired) { mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message))); } if (validator != null) { - mono = mono.doOnNext(validator::accept); + mono = mono.doOnNext(validator); } return (adapter != null ? Mono.just(adapter.fromPublisher(mono)) : Mono.from(mono)); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java index 63facb327a..c112cb16bd 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java @@ -244,8 +244,8 @@ final class DefaultRSocketRequester implements RSocketRequester { } Decoder decoder = strategies.decoder(elementType, dataMimeType); - return (Mono) decoder.decodeToMono( - payloadMono.map(this::retainDataAndReleasePayload), elementType, dataMimeType, EMPTY_HINTS); + return (Mono) payloadMono.map(this::retainDataAndReleasePayload) + .map(dataBuffer -> decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS)); } @SuppressWarnings("unchecked") @@ -261,8 +261,8 @@ final class DefaultRSocketRequester implements RSocketRequester { Decoder decoder = strategies.decoder(elementType, dataMimeType); - return payloadFlux.map(this::retainDataAndReleasePayload).concatMap(dataBuffer -> - (Mono) decoder.decodeToMono(Mono.just(dataBuffer), elementType, dataMimeType, EMPTY_HINTS)); + return payloadFlux.map(this::retainDataAndReleasePayload).map(dataBuffer -> + (T) decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS)); } private DataBuffer retainDataAndReleasePayload(Payload payload) { diff --git a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java index d66f59c601..dba78e8ec2 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java +++ b/spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java @@ -106,10 +106,11 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader line.equals("")) - .concatMap(lines -> buildEvent(lines, valueType, shouldWrap, hints)); + .concatMap(lines -> Mono.justOrEmpty(buildEvent(lines, valueType, shouldWrap, hints))); } - private Mono buildEvent(List lines, ResolvableType valueType, boolean shouldWrap, + @Nullable + private Object buildEvent(List lines, ResolvableType valueType, boolean shouldWrap, Map hints) { ServerSentEvent.Builder sseBuilder = shouldWrap ? ServerSentEvent.builder() : null; @@ -138,34 +139,32 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader decodedData = (data != null ? decodeData(data.toString(), valueType, hints) : Mono.empty()); + Object decodedData = data != null ? decodeData(data.toString(), valueType, hints) : null; if (shouldWrap) { if (comment != null) { sseBuilder.comment(comment.toString().substring(0, comment.length() - 1)); } - return decodedData.map(o -> { - sseBuilder.data(o); - return sseBuilder.build(); - }); + if (decodedData != null) { + sseBuilder.data(decodedData); + } + return sseBuilder.build(); } else { return decodedData; } } - private Mono decodeData(String data, ResolvableType dataType, Map hints) { + private Object decodeData(String data, ResolvableType dataType, Map hints) { if (String.class == dataType.resolve()) { - return Mono.just(data.substring(0, data.length() - 1)); + return data.substring(0, data.length() - 1); } - if (this.decoder == null) { - return Mono.error(new CodecException("No SSE decoder configured and the data is not String.")); + throw new CodecException("No SSE decoder configured and the data is not String."); } - byte[] bytes = data.getBytes(StandardCharsets.UTF_8); DataBuffer buffer = bufferFactory.wrap(bytes); // wrapping only, no allocation - return this.decoder.decodeToMono(Mono.just(buffer), dataType, MediaType.TEXT_EVENT_STREAM, hints); + return this.decoder.decode(buffer, dataType, MediaType.TEXT_EVENT_STREAM, hints); } @Override