From 02f61e3aaeebf3beb08317a0d3aa934a6d8fc747 Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Wed, 14 Sep 2022 16:43:14 +0100 Subject: [PATCH] Fix failing test in CancelWithoutDemandCodecTests The test started failing after recent commit e370c15bc60c520ad26150ac4b735efe6f0d5e57. See gh-29038 --- .../codec/json/AbstractJackson2Encoder.java | 47 ++++++++----------- 1 file changed, 19 insertions(+), 28 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java b/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java index 5803ab5cbd..c83aad25bd 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/AbstractJackson2Encoder.java @@ -36,7 +36,6 @@ import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.SequenceWriter; import com.fasterxml.jackson.databind.exc.InvalidDefinitionException; import com.fasterxml.jackson.databind.ser.FilterProvider; -import org.apache.commons.logging.Log; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -182,20 +181,23 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple // Do not prepend JSON array prefix until first signal is known, onNext vs onError // Keeps response not committed for error handling - Flux flux1 = helper.getPrefix(bufferFactory, hints, logger) - .concatWith(Flux.just(EMPTY_BUFFER).repeat()); + dataBufferFlux = Flux.from(inputStream) + .map(value -> { + byte[] prefix = helper.getPrefix(); + byte[] delimiter = helper.getDelimiter(); - Flux flux2 = Flux.from(inputStream).map(value -> encodeStreamingValue( - value, bufferFactory, hints, sequenceWriter, byteBuilder, helper.getDelimiter(), EMPTY_BYTES)); + DataBuffer dataBuffer = encodeStreamingValue( + value, bufferFactory, hints, sequenceWriter, byteBuilder, delimiter, EMPTY_BYTES); - dataBufferFlux = Flux.zip(flux1, flux2, (buffer1, buffer2) -> - (buffer1 != EMPTY_BUFFER ? - bufferFactory.join(Arrays.asList(buffer1, buffer2)) : - buffer2)) - .concatWith(helper.getSuffix(bufferFactory, hints, logger)); + return (prefix.length > 0 ? + bufferFactory.join(Arrays.asList(bufferFactory.wrap(prefix), dataBuffer)) : + dataBuffer); + }) + .concatWith(Mono.fromCallable(() -> bufferFactory.wrap(helper.getSuffix()))); } return dataBufferFlux + .doOnNext(dataBuffer -> Hints.touchDataBuffer(dataBuffer, hints, logger)) .doAfterTerminate(() -> { try { byteBuilder.release(); @@ -420,33 +422,22 @@ public abstract class AbstractJackson2Encoder extends Jackson2CodecSupport imple private static final byte[] CLOSE_BRACKET = {']'}; - - private boolean afterFirstItem = false; + private boolean firstItemEmitted; public byte[] getDelimiter() { - if (this.afterFirstItem) { + if (this.firstItemEmitted) { return COMMA_SEPARATOR; } - this.afterFirstItem = true; + this.firstItemEmitted = true; return EMPTY_BYTES; } - public Mono getPrefix(DataBufferFactory factory, @Nullable Map hints, Log logger) { - return wrapBytes(OPEN_BRACKET, factory, hints, logger); - } - - public Mono getSuffix(DataBufferFactory factory, @Nullable Map hints, Log logger) { - return wrapBytes(CLOSE_BRACKET, factory, hints, logger); + public byte[] getPrefix() { + return (this.firstItemEmitted ? EMPTY_BYTES : OPEN_BRACKET); } - private Mono wrapBytes( - byte[] bytes, DataBufferFactory bufferFactory, @Nullable Map hints, Log logger) { - - return Mono.fromCallable(() -> { - DataBuffer buffer = bufferFactory.wrap(bytes); - Hints.touchDataBuffer(buffer, hints, logger); - return buffer; - }); + public byte[] getSuffix() { + return CLOSE_BRACKET; } }