diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java index f27ff8005e..ca477cde85 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java @@ -19,6 +19,7 @@ package org.springframework.core.codec.support; import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.util.List; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; @@ -32,11 +33,9 @@ import org.springframework.util.MimeTypeUtils; /** * Decode from a bytes stream to a String stream. * - *

By default, this decoder will buffer the received elements into a single - * {@code ByteBuffer} and will emit a single {@code String} once the stream of - * elements is complete. This behavior can be turned off using an constructor - * argument but the {@code Subcriber} should pay attention to split characters - * issues. + *

By default, this decoder will split the received {@link DataBuffer}s along newline + * characters ({@code \r\n}), but this can be changed by passing {@code false} as + * constructor argument. * * @author Sebastien Deleuze * @author Brian Clozel @@ -48,13 +47,12 @@ public class StringDecoder extends AbstractDecoder { public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; - private final boolean reduceToSingleBuffer; + private final boolean splitOnNewline; /** * Create a {@code StringDecoder} that decodes a bytes stream to a String stream * - *

By default, this decoder will buffer bytes and - * emit a single String as a result. + *

By default, this decoder will split along new lines. */ public StringDecoder() { this(true); @@ -63,12 +61,12 @@ public class StringDecoder extends AbstractDecoder { /** * Create a {@code StringDecoder} that decodes a bytes stream to a String stream * - * @param reduceToSingleBuffer whether this decoder should buffer all received items - * and decode a single consolidated String or re-emit items as they are provided + * @param splitOnNewline whether this decoder should split the received data buffers + * along newline characters */ - public StringDecoder(boolean reduceToSingleBuffer) { + public StringDecoder(boolean splitOnNewline) { super(new MimeType("text", "*", DEFAULT_CHARSET), MimeTypeUtils.ALL); - this.reduceToSingleBuffer = reduceToSingleBuffer; + this.splitOnNewline = splitOnNewline; } @Override @@ -81,8 +79,12 @@ public class StringDecoder extends AbstractDecoder { public Flux decode(Publisher inputStream, ResolvableType type, MimeType mimeType, Object... hints) { Flux inputFlux = Flux.from(inputStream); - if (this.reduceToSingleBuffer) { - inputFlux = Flux.from(inputFlux.reduce(DataBuffer::write)); + if (this.splitOnNewline) { + inputFlux = inputFlux.flatMap(dataBuffer -> { + List tokens = + DataBufferUtils.tokenize(dataBuffer, b -> b == '\n' || b == '\r'); + return Flux.fromIterable(tokens); + }); } Charset charset = getCharset(mimeType); return inputFlux.map(dataBuffer -> { @@ -93,8 +95,8 @@ public class StringDecoder extends AbstractDecoder { } private Charset getCharset(MimeType mimeType) { - if (mimeType != null && mimeType.getCharSet() != null) { - return mimeType.getCharSet(); + if (mimeType != null && mimeType.getCharset() != null) { + return mimeType.getCharset(); } else { return DEFAULT_CHARSET; diff --git a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/XmlEventDecoder.java b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/XmlEventDecoder.java index b0174ad4c7..3f27b97fb7 100644 --- a/spring-web-reactive/src/main/java/org/springframework/core/codec/support/XmlEventDecoder.java +++ b/spring-web-reactive/src/main/java/org/springframework/core/codec/support/XmlEventDecoder.java @@ -16,6 +16,7 @@ package org.springframework.core.codec.support; +import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.function.Function; @@ -91,9 +92,9 @@ public class XmlEventDecoder extends AbstractDecoder { else { Mono singleBuffer = flux.reduce(DataBuffer::write); return singleBuffer. - map(DataBuffer::asInputStream). - flatMap(is -> { + flatMap(dataBuffer -> { try { + InputStream is = dataBuffer.asInputStream(); XMLEventReader eventReader = inputFactory.createXMLEventReader(is); return Flux @@ -102,6 +103,9 @@ public class XmlEventDecoder extends AbstractDecoder { catch (XMLStreamException ex) { return Mono.error(ex); } + finally { + DataBufferUtils.release(dataBuffer); + } }); } } diff --git a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringDecoderTests.java b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringDecoderTests.java index c1d69680af..34a8b0ae24 100644 --- a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringDecoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringDecoderTests.java @@ -18,18 +18,16 @@ package org.springframework.core.codec.support; import org.junit.Before; import org.junit.Test; -import reactor.core.converter.RxJava1SingleConverter; import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; import reactor.core.test.TestSubscriber; -import rx.Single; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.MediaType; -import static org.junit.Assert.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * @author Sebastien Deleuze @@ -62,48 +60,36 @@ public class StringDecoderTests extends AbstractDataBufferAllocatingTestCase { @Test public void decode() throws InterruptedException { + this.decoder = new StringDecoder(false); Flux source = Flux.just(stringBuffer("foo"), stringBuffer("bar"), stringBuffer("baz")); Flux output = this.decoder.decode(source, ResolvableType.forClass(String.class), null); TestSubscriber testSubscriber = new TestSubscriber<>(); - testSubscriber.bindTo(output).assertValues("foobarbaz"); + testSubscriber.bindTo(output). + assertNoError(). + assertComplete(). + assertValues("foo", "bar", "baz"); } @Test - public void decodeDoNotBuffer() throws InterruptedException { - StringDecoder decoder = new StringDecoder(false); - Flux source = Flux.just(stringBuffer("foo"), stringBuffer("bar")); + public void decodeNewLine() throws InterruptedException { + DataBuffer fooBar = stringBuffer("\nfoo\r\nbar\r"); + DataBuffer baz = stringBuffer("\nbaz"); + Flux source = Flux.just(fooBar, baz); Flux output = decoder.decode(source, ResolvableType.forClass(String.class), null); TestSubscriber testSubscriber = new TestSubscriber<>(); - testSubscriber.bindTo(output).assertValues("foo", "bar"); + testSubscriber.bindTo(output). + assertNoError(). + assertComplete(). + assertValues("foo", "bar", "baz"); } - @Test - public void decodeMono() throws InterruptedException { - Flux source = - Flux.just(stringBuffer("foo"), stringBuffer("bar"), stringBuffer("baz")); - Mono mono = Mono.from(this.decoder.decode(source, - ResolvableType.forClassWithGenerics(Mono.class, String.class), - MediaType.TEXT_PLAIN)); - TestSubscriber testSubscriber = new TestSubscriber<>(); - testSubscriber.bindTo(mono).assertValues("foobarbaz"); - } - - @Test - public void decodeSingle() throws InterruptedException { - Flux source = Flux.just(stringBuffer("foo"), stringBuffer("bar")); - Single single = RxJava1SingleConverter.from(this.decoder.decode(source, - ResolvableType.forClassWithGenerics(Single.class, String.class), - MediaType.TEXT_PLAIN)); - String result = single.toBlocking().value(); - assertEquals("foobar", result); - } @Test public void decodeEmpty() throws InterruptedException { - Mono source = Mono.just(stringBuffer("")); + Flux source = Flux.just(stringBuffer("")); Flux output = this.decoder.decode(source, ResolvableType.forClass(String.class), null); TestSubscriber testSubscriber = new TestSubscriber<>(); diff --git a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringEncoderTests.java b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringEncoderTests.java index 3742a6589a..20e642d934 100644 --- a/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringEncoderTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringEncoderTests.java @@ -27,6 +27,7 @@ import reactor.core.test.TestSubscriber; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; +import org.springframework.core.io.buffer.support.DataBufferUtils; import org.springframework.http.MediaType; import static org.junit.Assert.assertFalse; @@ -60,9 +61,10 @@ public class StringEncoderTests extends AbstractDataBufferAllocatingTestCase { Flux output = Flux.from( this.encoder.encode(Flux.just("foo"), this.allocator, null, null)) .map(chunk -> { - byte[] b = new byte[chunk.readableByteCount()]; - chunk.read(b); - return new String(b, StandardCharsets.UTF_8); + byte[] b = new byte[chunk.readableByteCount()]; + chunk.read(b); + DataBufferUtils.release(chunk); + return new String(b, StandardCharsets.UTF_8); }); TestSubscriber testSubscriber = new TestSubscriber<>(); testSubscriber.bindTo(output).assertValues("foo");