From a30a134c2397a7482bfd8c895f7ec1dc633e2067 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Wed, 15 May 2019 16:07:05 +0200 Subject: [PATCH] Make StringDecoder use DataBufferUtils.split * Added DataBufferUtils.split variant that takes multiple delimiters as argument (instead of 1). * Use this new split() variant from within StringDecoder, replacing its inefficient algorithm with the Knuth-Morris-Pratt algorithm. --- .../core/codec/StringDecoder.java | 115 +------ .../core/io/buffer/DataBufferUtils.java | 283 ++++++++++++++++-- .../core/io/buffer/DataBufferUtilsTests.java | 19 ++ 3 files changed, 285 insertions(+), 132 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index 9001de29c7..1bd9f3d8be 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -32,8 +32,6 @@ import reactor.core.publisher.Flux; import org.springframework.core.ResolvableType; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.DefaultDataBufferFactory; -import org.springframework.core.io.buffer.PooledDataBuffer; import org.springframework.core.log.LogFormatUtils; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -57,8 +55,6 @@ import org.springframework.util.MimeTypeUtils; */ public final class StringDecoder extends AbstractDataBufferDecoder { - private static final DataBuffer END_FRAME = new DefaultDataBufferFactory().wrap(new byte[0]); - /** The default charset to use, i.e. "UTF-8". */ public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; @@ -70,7 +66,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder { private final boolean stripDelimiter; - private final ConcurrentMap> delimitersCache = new ConcurrentHashMap<>(); + private final ConcurrentMap delimitersCache = new ConcurrentHashMap<>(); private StringDecoder(List delimiters, boolean stripDelimiter, MimeType... mimeTypes) { @@ -90,117 +86,24 @@ public final class StringDecoder extends AbstractDataBufferDecoder { public Flux decode(Publisher input, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { - List delimiterBytes = getDelimiterBytes(mimeType); + byte[][] delimiterBytes = getDelimiterBytes(mimeType); - Flux inputFlux = Flux.from(input) - .flatMapIterable(buffer -> splitOnDelimiter(buffer, delimiterBytes)) - .bufferUntil(buffer -> buffer == END_FRAME) - .map(StringDecoder::joinUntilEndFrame) - .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); + Flux inputFlux = + DataBufferUtils.split(input, delimiterBytes, this.stripDelimiter); return super.decode(inputFlux, elementType, mimeType, hints); } - private List getDelimiterBytes(@Nullable MimeType mimeType) { + private byte[][] getDelimiterBytes(@Nullable MimeType mimeType) { return this.delimitersCache.computeIfAbsent(getCharset(mimeType), charset -> { - List list = new ArrayList<>(); - for (String delimiter : this.delimiters) { - byte[] bytes = delimiter.getBytes(charset); - list.add(bytes); + byte[][] result = new byte[this.delimiters.size()][]; + for (int i = 0; i < this.delimiters.size(); i++) { + result[i] = this.delimiters.get(i).getBytes(charset); } - return list; + return result; }); } - /** - * Split the given data buffer on delimiter boundaries. - * The returned Flux contains an {@link #END_FRAME} buffer after each delimiter. - */ - private List splitOnDelimiter(DataBuffer buffer, List delimiterBytes) { - List frames = new ArrayList<>(); - try { - do { - int length = Integer.MAX_VALUE; - byte[] matchingDelimiter = null; - for (byte[] delimiter : delimiterBytes) { - int index = indexOf(buffer, delimiter); - if (index >= 0 && index < length) { - length = index; - matchingDelimiter = delimiter; - } - } - DataBuffer frame; - int readPosition = buffer.readPosition(); - if (matchingDelimiter != null) { - frame = this.stripDelimiter ? - buffer.slice(readPosition, length) : - buffer.slice(readPosition, length + matchingDelimiter.length); - buffer.readPosition(readPosition + length + matchingDelimiter.length); - frames.add(DataBufferUtils.retain(frame)); - frames.add(END_FRAME); - } - else { - frame = buffer.slice(readPosition, buffer.readableByteCount()); - buffer.readPosition(readPosition + buffer.readableByteCount()); - frames.add(DataBufferUtils.retain(frame)); - } - } - while (buffer.readableByteCount() > 0); - } - catch (Throwable ex) { - for (DataBuffer frame : frames) { - DataBufferUtils.release(frame); - } - throw ex; - } - finally { - DataBufferUtils.release(buffer); - } - return frames; - } - - /** - * Find the given delimiter in the given data buffer. - * @return the index of the delimiter, or -1 if not found. - */ - private static int indexOf(DataBuffer buffer, byte[] delimiter) { - for (int i = buffer.readPosition(); i < buffer.writePosition(); i++) { - int bufferPos = i; - int delimiterPos = 0; - while (delimiterPos < delimiter.length) { - if (buffer.getByte(bufferPos) != delimiter[delimiterPos]) { - break; - } - else { - bufferPos++; - boolean endOfBuffer = bufferPos == buffer.writePosition(); - boolean endOfDelimiter = delimiterPos == delimiter.length - 1; - if (endOfBuffer && !endOfDelimiter) { - return -1; - } - } - delimiterPos++; - } - if (delimiterPos == delimiter.length) { - return i - buffer.readPosition(); - } - } - return -1; - } - - /** - * Join the given list of buffers into a single buffer. - */ - private static DataBuffer joinUntilEndFrame(List dataBuffers) { - if (!dataBuffers.isEmpty()) { - int lastIdx = dataBuffers.size() - 1; - if (dataBuffers.get(lastIdx) == END_FRAME) { - dataBuffers.remove(lastIdx); - } - } - return dataBuffers.get(0).factory().join(dataBuffers); - } - @Override public String decode(DataBuffer dataBuffer, ResolvableType elementType, @Nullable MimeType mimeType, @Nullable Map hints) { diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 9e76321bbb..8eb5dfb210 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -27,6 +27,7 @@ import java.nio.channels.Channels; import java.nio.channels.CompletionHandler; import java.nio.channels.ReadableByteChannel; import java.nio.channels.WritableByteChannel; +import java.nio.charset.Charset; import java.nio.file.StandardOpenOption; import java.util.ArrayList; import java.util.Arrays; @@ -36,6 +37,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.IntPredicate; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; @@ -60,8 +62,6 @@ public abstract class DataBufferUtils { private static final Consumer RELEASE_CONSUMER = DataBufferUtils::release; - private static final DataBuffer END_FRAME = new DefaultDataBufferFactory().wrap(new byte[0]); - //--------------------------------------------------------------------- // Reading @@ -486,10 +486,9 @@ public abstract class DataBufferUtils { /** * Splits the given stream of data buffers around the given delimiter. * The returned flux contains data buffers that are terminated by the given delimiter, - * which is included when {@code stripDelimiter} is {@code true}, or stripped off when - * {@code false}. + * which is included when {@code stripDelimiter} is {@code false}. * @param dataBuffers the input stream of data buffers - * @param delimiter the delimiting byte array + * @param delimiter the delimiter bytes * @param stripDelimiter whether to include the delimiter at the end of each resulting buffer * @return the flux of data buffers created by splitting the given data buffers around the * given delimiter @@ -497,29 +496,78 @@ public abstract class DataBufferUtils { */ public static Flux split(Publisher dataBuffers, byte[] delimiter, boolean stripDelimiter) { + + return split(dataBuffers, new byte[][]{delimiter}, stripDelimiter); + } + + /** + * Splits the given stream of data buffers around the given delimiters. + * The returned flux contains data buffers that are terminated by any of the given delimiters, + * which are included when {@code stripDelimiter} is {@code false}. + * @param dataBuffers the input stream of data buffers + * @param delimiters the delimiters, one per element + * @param stripDelimiter whether to include the delimiters at the end of each resulting buffer + * @return the flux of data buffers created by splitting the given data buffers around the + * given delimiters + * @since 5.2 + */ + public static Flux split(Publisher dataBuffers, byte[][] delimiters, + boolean stripDelimiter) { Assert.notNull(dataBuffers, "DataBuffers must not be null"); - Assert.isTrue(delimiter.length > 0, "Delimiter must not be empty"); + Assert.isTrue(delimiters.length > 0, "Delimiter must not be empty"); + + Matcher[] matchers = matchers(delimiters); - Matcher matcher = matcher(delimiter); return Flux.from(dataBuffers) - .flatMap(buffer -> endFrameOnDelimiter(buffer, matcher)) - .bufferUntil(buffer -> buffer == END_FRAME) - .map(buffers -> joinAndStrip(buffers, delimiter, stripDelimiter)) + .flatMap(buffer -> endFrameAfterDelimiter(buffer, matchers)) + .bufferUntil(buffer -> buffer instanceof EndFrameBuffer) + .map(buffers -> joinAndStrip(buffers, stripDelimiter)) .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release); } - // Return Flux, because returning List (w/ flatMapIterable) results in memory leaks because - // of pre-fetching. - private static Flux endFrameOnDelimiter(DataBuffer dataBuffer, Matcher matcher) { + private static Matcher[] matchers(byte[][] delimiters) { + Assert.isTrue(delimiters.length > 0, "Delimiters must not be empty"); + Matcher[] result = new Matcher[delimiters.length]; + for (int i = 0; i < delimiters.length; i++) { + result[i] = matcher(delimiters[i]); + } + return result; + } + + /** + * Finds the {@link Matcher} with the first match and longest delimiter, and inserts a + * {@link EndFrameBuffer} just after its match. + * + * @param dataBuffer the buffer to find delimiters in + * @param matchers used to find the first delimiters + * @return a flux of buffers, containing {@link EndFrameBuffer} after each delimiter that was + * found in {@code dataBuffer}. Returns Flux, because returning List (w/ flatMapIterable) + * results in memory leaks due to pre-fetching. + */ + private static Flux endFrameAfterDelimiter(DataBuffer dataBuffer, Matcher[] matchers) { List result = new ArrayList<>(); do { - int endIdx = matcher.match(dataBuffer); - int readPosition = dataBuffer.readPosition(); - if (endIdx != -1) { - int length = endIdx + 1 - readPosition ; + int matchedEndIdx = Integer.MAX_VALUE; + byte[] matchedDelimiter = new byte[0]; + for (Matcher matcher : matchers) { + int endIdx = matcher.match(dataBuffer); + if (endIdx != -1 && + endIdx <= matchedEndIdx && + matcher.delimiter().length > matchedDelimiter.length) { + matchedEndIdx = endIdx; + matchedDelimiter = matcher.delimiter(); + } + } + if (matchedDelimiter.length > 0) { + int readPosition = dataBuffer.readPosition(); + int length = matchedEndIdx + 1 - readPosition ; result.add(dataBuffer.retainedSlice(readPosition, length)); - result.add(END_FRAME); - dataBuffer.readPosition(endIdx + 1); + result.add(new EndFrameBuffer(matchedDelimiter)); + dataBuffer.readPosition(matchedEndIdx + 1); + + for (Matcher matcher : matchers) { + matcher.reset(); + } } else { result.add(retain(dataBuffer)); @@ -532,21 +580,32 @@ public abstract class DataBufferUtils { return Flux.fromIterable(result); } - private static DataBuffer joinAndStrip(List dataBuffers, byte[] delimiter, + /** + * Joins the given list of buffers. If the list ends with a {@link EndFrameBuffer}, it is + * removed. If {@code stripDelimiter} is {@code true} and the resulting buffer ends with + * a delimiter, it is removed. + * @param dataBuffers the data buffers to join + * @param stripDelimiter whether to strip the delimiter + * @return + */ + private static DataBuffer joinAndStrip(List dataBuffers, boolean stripDelimiter) { Assert.state(!dataBuffers.isEmpty(), "DataBuffers should not be empty"); - boolean endFrameFound = false; + byte[] matchingDelimiter = null; + int lastIdx = dataBuffers.size() - 1; - if (dataBuffers.get(lastIdx) == END_FRAME) { - endFrameFound = true; + DataBuffer lastBuffer = dataBuffers.get(lastIdx); + if (lastBuffer instanceof EndFrameBuffer) { + matchingDelimiter = ((EndFrameBuffer) lastBuffer).delimiter(); dataBuffers.remove(lastIdx); } DataBuffer result = dataBuffers.get(0).factory().join(dataBuffers); - if (stripDelimiter && endFrameFound) { - result.writePosition(result.writePosition() - delimiter.length); + + if (stripDelimiter && matchingDelimiter != null) { + result.writePosition(result.writePosition() - matchingDelimiter.length); } return result; } @@ -573,6 +632,11 @@ public abstract class DataBufferUtils { */ byte[] delimiter(); + /** + * Resets the state of this matcher. + */ + void reset(); + } @@ -865,7 +929,7 @@ public abstract class DataBufferUtils { if (b == this.delimiter[this.matches]) { this.matches++; if (this.matches == this.delimiter.length) { - this.matches = 0; + reset(); return i; } } @@ -877,6 +941,173 @@ public abstract class DataBufferUtils { public byte[] delimiter() { return Arrays.copyOf(this.delimiter, this.delimiter.length); } + + @Override + public void reset() { + this.matches = 0; + } + } + + + private static class EndFrameBuffer implements DataBuffer { + + private static final DataBuffer BUFFER = new DefaultDataBufferFactory().wrap(new byte[0]); + + private byte[] delimiter; + + + public EndFrameBuffer(byte[] delimiter) { + this.delimiter = delimiter; + } + + public byte[] delimiter() { + return this.delimiter; + } + + @Override + public DataBufferFactory factory() { + return BUFFER.factory(); + } + + @Override + public int indexOf(IntPredicate predicate, int fromIndex) { + return BUFFER.indexOf(predicate, fromIndex); + } + + @Override + public int lastIndexOf(IntPredicate predicate, int fromIndex) { + return BUFFER.lastIndexOf(predicate, fromIndex); + } + + @Override + public int readableByteCount() { + return BUFFER.readableByteCount(); + } + + @Override + public int writableByteCount() { + return BUFFER.writableByteCount(); + } + + @Override + public int capacity() { + return BUFFER.capacity(); + } + + @Override + public DataBuffer capacity(int capacity) { + return BUFFER.capacity(capacity); + } + + @Override + public DataBuffer ensureCapacity(int capacity) { + return BUFFER.ensureCapacity(capacity); + } + + @Override + public int readPosition() { + return BUFFER.readPosition(); + } + + @Override + public DataBuffer readPosition(int readPosition) { + return BUFFER.readPosition(readPosition); + } + + @Override + public int writePosition() { + return BUFFER.writePosition(); + } + + @Override + public DataBuffer writePosition(int writePosition) { + return BUFFER.writePosition(writePosition); + } + + @Override + public byte getByte(int index) { + return BUFFER.getByte(index); + } + + @Override + public byte read() { + return BUFFER.read(); + } + + @Override + public DataBuffer read(byte[] destination) { + return BUFFER.read(destination); + } + + @Override + public DataBuffer read(byte[] destination, int offset, int length) { + return BUFFER.read(destination, offset, length); + } + + @Override + public DataBuffer write(byte b) { + return BUFFER.write(b); + } + + @Override + public DataBuffer write(byte[] source) { + return BUFFER.write(source); + } + + @Override + public DataBuffer write(byte[] source, int offset, int length) { + return BUFFER.write(source, offset, length); + } + + @Override + public DataBuffer write(DataBuffer... buffers) { + return BUFFER.write(buffers); + } + + @Override + public DataBuffer write(ByteBuffer... buffers) { + return BUFFER.write(buffers); + } + + @Override + public DataBuffer write(CharSequence charSequence, Charset charset) { + return BUFFER.write(charSequence, charset); + } + + @Override + public DataBuffer slice(int index, int length) { + return BUFFER.slice(index, length); + } + + @Override + public DataBuffer retainedSlice(int index, int length) { + return BUFFER.retainedSlice(index, length); + } + + @Override + public ByteBuffer asByteBuffer() { + return BUFFER.asByteBuffer(); + } + + @Override + public ByteBuffer asByteBuffer(int index, int length) { + return BUFFER.asByteBuffer(index, length); + } + + @Override + public InputStream asInputStream() { + return BUFFER.asInputStream(); + } + + @Override + public InputStream asInputStream(boolean releaseOnClose) { + return BUFFER.asInputStream(releaseOnClose); + } + + @Override + public OutputStream asOutputStream() { + return BUFFER.asOutputStream(); + } } } diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java index f600fe3075..71fe5a9cc6 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/DataBufferUtilsTests.java @@ -820,6 +820,25 @@ public class DataBufferUtilsTests extends AbstractDataBufferAllocatingTestCase { .verifyComplete(); } + @Test + public void splitMultipleDelimiters() { + Mono source = + deferStringBuffer("foo␤bar␍␤baz␤"); + + byte[][] delimiters = new byte[][]{ + "␤".getBytes(StandardCharsets.UTF_8), + "␍␤".getBytes(StandardCharsets.UTF_8) + }; + + Flux result = DataBufferUtils.split(source, delimiters, false); + + StepVerifier.create(result) + .consumeNextWith(stringConsumer("foo␤")) + .consumeNextWith(stringConsumer("bar␍␤")) + .consumeNextWith(stringConsumer("baz␤")) + .verifyComplete(); + } + @Test public void splitErrors() { Flux source = Flux.concat(