Browse Source

Use decode from a DataBuffer where feasible

See gh-22782
pull/22792/head
Rossen Stoyanchev 6 years ago
parent
commit
f89d2ac148
  1. 10
      spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolver.java
  2. 8
      spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java
  3. 25
      spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java

10
spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/reactive/PayloadMethodArgumentResolver.java

@ -232,26 +232,26 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol @@ -232,26 +232,26 @@ public class PayloadMethodArgumentResolver implements HandlerMethodArgumentResol
if (decoder.canDecode(elementType, mimeType)) {
if (adapter != null && adapter.isMultiValue()) {
Flux<?> flux = content
.concatMap(buffer -> decoder.decode(Mono.just(buffer), elementType, mimeType, hints))
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.onErrorResume(ex -> Flux.error(handleReadError(parameter, message, ex)));
if (isContentRequired) {
flux = flux.switchIfEmpty(Flux.error(() -> handleMissingBody(parameter, message)));
}
if (validator != null) {
flux = flux.doOnNext(validator::accept);
flux = flux.doOnNext(validator);
}
return Mono.just(adapter.fromPublisher(flux));
}
else {
// Single-value (with or without reactive type wrapper)
Mono<?> mono = decoder
.decodeToMono(content.next(), targetType, mimeType, hints)
Mono<?> mono = content.next()
.map(buffer -> decoder.decode(buffer, elementType, mimeType, hints))
.onErrorResume(ex -> Mono.error(handleReadError(parameter, message, ex)));
if (isContentRequired) {
mono = mono.switchIfEmpty(Mono.error(() -> handleMissingBody(parameter, message)));
}
if (validator != null) {
mono = mono.doOnNext(validator::accept);
mono = mono.doOnNext(validator);
}
return (adapter != null ? Mono.just(adapter.fromPublisher(mono)) : Mono.from(mono));
}

8
spring-messaging/src/main/java/org/springframework/messaging/rsocket/DefaultRSocketRequester.java

@ -244,8 +244,8 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -244,8 +244,8 @@ final class DefaultRSocketRequester implements RSocketRequester {
}
Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
return (Mono<T>) decoder.decodeToMono(
payloadMono.map(this::retainDataAndReleasePayload), elementType, dataMimeType, EMPTY_HINTS);
return (Mono<T>) payloadMono.map(this::retainDataAndReleasePayload)
.map(dataBuffer -> decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
}
@SuppressWarnings("unchecked")
@ -261,8 +261,8 @@ final class DefaultRSocketRequester implements RSocketRequester { @@ -261,8 +261,8 @@ final class DefaultRSocketRequester implements RSocketRequester {
Decoder<?> decoder = strategies.decoder(elementType, dataMimeType);
return payloadFlux.map(this::retainDataAndReleasePayload).concatMap(dataBuffer ->
(Mono<T>) decoder.decodeToMono(Mono.just(dataBuffer), elementType, dataMimeType, EMPTY_HINTS));
return payloadFlux.map(this::retainDataAndReleasePayload).map(dataBuffer ->
(T) decoder.decode(dataBuffer, elementType, dataMimeType, EMPTY_HINTS));
}
private DataBuffer retainDataAndReleasePayload(Payload payload) {

25
spring-web/src/main/java/org/springframework/http/codec/ServerSentEventHttpMessageReader.java

@ -106,10 +106,11 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec @@ -106,10 +106,11 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
return stringDecoder.decode(message.getBody(), STRING_TYPE, null, hints)
.bufferUntil(line -> line.equals(""))
.concatMap(lines -> buildEvent(lines, valueType, shouldWrap, hints));
.concatMap(lines -> Mono.justOrEmpty(buildEvent(lines, valueType, shouldWrap, hints)));
}
private Mono<?> buildEvent(List<String> lines, ResolvableType valueType, boolean shouldWrap,
@Nullable
private Object buildEvent(List<String> lines, ResolvableType valueType, boolean shouldWrap,
Map<String, Object> hints) {
ServerSentEvent.Builder<Object> sseBuilder = shouldWrap ? ServerSentEvent.builder() : null;
@ -138,34 +139,32 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec @@ -138,34 +139,32 @@ public class ServerSentEventHttpMessageReader implements HttpMessageReader<Objec
}
}
Mono<?> decodedData = (data != null ? decodeData(data.toString(), valueType, hints) : Mono.empty());
Object decodedData = data != null ? decodeData(data.toString(), valueType, hints) : null;
if (shouldWrap) {
if (comment != null) {
sseBuilder.comment(comment.toString().substring(0, comment.length() - 1));
}
return decodedData.map(o -> {
sseBuilder.data(o);
return sseBuilder.build();
});
if (decodedData != null) {
sseBuilder.data(decodedData);
}
return sseBuilder.build();
}
else {
return decodedData;
}
}
private Mono<?> decodeData(String data, ResolvableType dataType, Map<String, Object> hints) {
private Object decodeData(String data, ResolvableType dataType, Map<String, Object> hints) {
if (String.class == dataType.resolve()) {
return Mono.just(data.substring(0, data.length() - 1));
return data.substring(0, data.length() - 1);
}
if (this.decoder == null) {
return Mono.error(new CodecException("No SSE decoder configured and the data is not String."));
throw new CodecException("No SSE decoder configured and the data is not String.");
}
byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
DataBuffer buffer = bufferFactory.wrap(bytes); // wrapping only, no allocation
return this.decoder.decodeToMono(Mono.just(buffer), dataType, MediaType.TEXT_EVENT_STREAM, hints);
return this.decoder.decode(buffer, dataType, MediaType.TEXT_EVENT_STREAM, hints);
}
@Override

Loading…
Cancel
Save