Browse Source

Changed reduceToSingleBuffer to splitOnNewline

In order to be more "reactive", changed StringDecoder's default from
merging all buffers in the stream to a single buffer into splitting the
buffers along newline (\r, \n) characters.
pull/1111/head
Arjen Poutsma 9 years ago
parent
commit
6f46164727
  1. 34
      spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java
  2. 8
      spring-web-reactive/src/main/java/org/springframework/core/codec/support/XmlEventDecoder.java
  3. 46
      spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringDecoderTests.java
  4. 8
      spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringEncoderTests.java

34
spring-web-reactive/src/main/java/org/springframework/core/codec/support/StringDecoder.java

@ -19,6 +19,7 @@ package org.springframework.core.codec.support;
import java.nio.CharBuffer; import java.nio.CharBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.List;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
@ -32,11 +33,9 @@ import org.springframework.util.MimeTypeUtils;
/** /**
* Decode from a bytes stream to a String stream. * Decode from a bytes stream to a String stream.
* *
* <p>By default, this decoder will buffer the received elements into a single * <p>By default, this decoder will split the received {@link DataBuffer}s along newline
* {@code ByteBuffer} and will emit a single {@code String} once the stream of * characters ({@code \r\n}), but this can be changed by passing {@code false} as
* elements is complete. This behavior can be turned off using an constructor * constructor argument.
* argument but the {@code Subcriber} should pay attention to split characters
* issues.
* *
* @author Sebastien Deleuze * @author Sebastien Deleuze
* @author Brian Clozel * @author Brian Clozel
@ -48,13 +47,12 @@ public class StringDecoder extends AbstractDecoder<String> {
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
private final boolean reduceToSingleBuffer; private final boolean splitOnNewline;
/** /**
* Create a {@code StringDecoder} that decodes a bytes stream to a String stream * Create a {@code StringDecoder} that decodes a bytes stream to a String stream
* *
* <p>By default, this decoder will buffer bytes and * <p>By default, this decoder will split along new lines.
* emit a single String as a result.
*/ */
public StringDecoder() { public StringDecoder() {
this(true); this(true);
@ -63,12 +61,12 @@ public class StringDecoder extends AbstractDecoder<String> {
/** /**
* Create a {@code StringDecoder} that decodes a bytes stream to a String stream * Create a {@code StringDecoder} that decodes a bytes stream to a String stream
* *
* @param reduceToSingleBuffer whether this decoder should buffer all received items * @param splitOnNewline whether this decoder should split the received data buffers
* and decode a single consolidated String or re-emit items as they are provided * along newline characters
*/ */
public StringDecoder(boolean reduceToSingleBuffer) { public StringDecoder(boolean splitOnNewline) {
super(new MimeType("text", "*", DEFAULT_CHARSET), MimeTypeUtils.ALL); super(new MimeType("text", "*", DEFAULT_CHARSET), MimeTypeUtils.ALL);
this.reduceToSingleBuffer = reduceToSingleBuffer; this.splitOnNewline = splitOnNewline;
} }
@Override @Override
@ -81,8 +79,12 @@ public class StringDecoder extends AbstractDecoder<String> {
public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType type, public Flux<String> decode(Publisher<DataBuffer> inputStream, ResolvableType type,
MimeType mimeType, Object... hints) { MimeType mimeType, Object... hints) {
Flux<DataBuffer> inputFlux = Flux.from(inputStream); Flux<DataBuffer> inputFlux = Flux.from(inputStream);
if (this.reduceToSingleBuffer) { if (this.splitOnNewline) {
inputFlux = Flux.from(inputFlux.reduce(DataBuffer::write)); inputFlux = inputFlux.flatMap(dataBuffer -> {
List<DataBuffer> tokens =
DataBufferUtils.tokenize(dataBuffer, b -> b == '\n' || b == '\r');
return Flux.fromIterable(tokens);
});
} }
Charset charset = getCharset(mimeType); Charset charset = getCharset(mimeType);
return inputFlux.map(dataBuffer -> { return inputFlux.map(dataBuffer -> {
@ -93,8 +95,8 @@ public class StringDecoder extends AbstractDecoder<String> {
} }
private Charset getCharset(MimeType mimeType) { private Charset getCharset(MimeType mimeType) {
if (mimeType != null && mimeType.getCharSet() != null) { if (mimeType != null && mimeType.getCharset() != null) {
return mimeType.getCharSet(); return mimeType.getCharset();
} }
else { else {
return DEFAULT_CHARSET; return DEFAULT_CHARSET;

8
spring-web-reactive/src/main/java/org/springframework/core/codec/support/XmlEventDecoder.java

@ -16,6 +16,7 @@
package org.springframework.core.codec.support; package org.springframework.core.codec.support;
import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.function.Function; import java.util.function.Function;
@ -91,9 +92,9 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
else { else {
Mono<DataBuffer> singleBuffer = flux.reduce(DataBuffer::write); Mono<DataBuffer> singleBuffer = flux.reduce(DataBuffer::write);
return singleBuffer. return singleBuffer.
map(DataBuffer::asInputStream). flatMap(dataBuffer -> {
flatMap(is -> {
try { try {
InputStream is = dataBuffer.asInputStream();
XMLEventReader eventReader = XMLEventReader eventReader =
inputFactory.createXMLEventReader(is); inputFactory.createXMLEventReader(is);
return Flux return Flux
@ -102,6 +103,9 @@ public class XmlEventDecoder extends AbstractDecoder<XMLEvent> {
catch (XMLStreamException ex) { catch (XMLStreamException ex) {
return Mono.error(ex); return Mono.error(ex);
} }
finally {
DataBufferUtils.release(dataBuffer);
}
}); });
} }
} }

46
spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringDecoderTests.java

@ -18,18 +18,16 @@ package org.springframework.core.codec.support;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import reactor.core.converter.RxJava1SingleConverter;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.test.TestSubscriber; import reactor.core.test.TestSubscriber;
import rx.Single;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import static org.junit.Assert.*; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/** /**
* @author Sebastien Deleuze * @author Sebastien Deleuze
@ -62,48 +60,36 @@ public class StringDecoderTests extends AbstractDataBufferAllocatingTestCase {
@Test @Test
public void decode() throws InterruptedException { public void decode() throws InterruptedException {
this.decoder = new StringDecoder(false);
Flux<DataBuffer> source = Flux<DataBuffer> source =
Flux.just(stringBuffer("foo"), stringBuffer("bar"), stringBuffer("baz")); Flux.just(stringBuffer("foo"), stringBuffer("bar"), stringBuffer("baz"));
Flux<String> output = Flux<String> output =
this.decoder.decode(source, ResolvableType.forClass(String.class), null); this.decoder.decode(source, ResolvableType.forClass(String.class), null);
TestSubscriber<String> testSubscriber = new TestSubscriber<>(); TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(output).assertValues("foobarbaz"); testSubscriber.bindTo(output).
assertNoError().
assertComplete().
assertValues("foo", "bar", "baz");
} }
@Test @Test
public void decodeDoNotBuffer() throws InterruptedException { public void decodeNewLine() throws InterruptedException {
StringDecoder decoder = new StringDecoder(false); DataBuffer fooBar = stringBuffer("\nfoo\r\nbar\r");
Flux<DataBuffer> source = Flux.just(stringBuffer("foo"), stringBuffer("bar")); DataBuffer baz = stringBuffer("\nbaz");
Flux<DataBuffer> source = Flux.just(fooBar, baz);
Flux<String> output = Flux<String> output =
decoder.decode(source, ResolvableType.forClass(String.class), null); decoder.decode(source, ResolvableType.forClass(String.class), null);
TestSubscriber<String> testSubscriber = new TestSubscriber<>(); TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(output).assertValues("foo", "bar"); testSubscriber.bindTo(output).
assertNoError().
assertComplete().
assertValues("foo", "bar", "baz");
} }
@Test
public void decodeMono() throws InterruptedException {
Flux<DataBuffer> source =
Flux.just(stringBuffer("foo"), stringBuffer("bar"), stringBuffer("baz"));
Mono<String> mono = Mono.from(this.decoder.decode(source,
ResolvableType.forClassWithGenerics(Mono.class, String.class),
MediaType.TEXT_PLAIN));
TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(mono).assertValues("foobarbaz");
}
@Test
public void decodeSingle() throws InterruptedException {
Flux<DataBuffer> source = Flux.just(stringBuffer("foo"), stringBuffer("bar"));
Single<String> single = RxJava1SingleConverter.from(this.decoder.decode(source,
ResolvableType.forClassWithGenerics(Single.class, String.class),
MediaType.TEXT_PLAIN));
String result = single.toBlocking().value();
assertEquals("foobar", result);
}
@Test @Test
public void decodeEmpty() throws InterruptedException { public void decodeEmpty() throws InterruptedException {
Mono<DataBuffer> source = Mono.just(stringBuffer("")); Flux<DataBuffer> source = Flux.just(stringBuffer(""));
Flux<String> output = Flux<String> output =
this.decoder.decode(source, ResolvableType.forClass(String.class), null); this.decoder.decode(source, ResolvableType.forClass(String.class), null);
TestSubscriber<String> testSubscriber = new TestSubscriber<>(); TestSubscriber<String> testSubscriber = new TestSubscriber<>();

8
spring-web-reactive/src/test/java/org/springframework/core/codec/support/StringEncoderTests.java

@ -27,6 +27,7 @@ import reactor.core.test.TestSubscriber;
import org.springframework.core.ResolvableType; import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase; import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
import org.springframework.core.io.buffer.support.DataBufferUtils;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -60,9 +61,10 @@ public class StringEncoderTests extends AbstractDataBufferAllocatingTestCase {
Flux<String> output = Flux.from( Flux<String> output = Flux.from(
this.encoder.encode(Flux.just("foo"), this.allocator, null, null)) this.encoder.encode(Flux.just("foo"), this.allocator, null, null))
.map(chunk -> { .map(chunk -> {
byte[] b = new byte[chunk.readableByteCount()]; byte[] b = new byte[chunk.readableByteCount()];
chunk.read(b); chunk.read(b);
return new String(b, StandardCharsets.UTF_8); DataBufferUtils.release(chunk);
return new String(b, StandardCharsets.UTF_8);
}); });
TestSubscriber<String> testSubscriber = new TestSubscriber<>(); TestSubscriber<String> testSubscriber = new TestSubscriber<>();
testSubscriber.bindTo(output).assertValues("foo"); testSubscriber.bindTo(output).assertValues("foo");

Loading…
Cancel
Save