diff --git a/spring-core/src/main/java/org/springframework/core/codec/CharBufferDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/CharBufferDecoder.java new file mode 100644 index 0000000000..93a3f965eb --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/codec/CharBufferDecoder.java @@ -0,0 +1,245 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.LimitedDataBufferList; +import org.springframework.core.log.LogFormatUtils; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; + +/** + * Decode from a data buffer stream to a {@code CharBuffer} stream, either splitting + * or aggregating incoming data chunks to realign along newlines delimiters + * and produce a stream of char buffers. This is useful for streaming but is also + * necessary to ensure that multi-byte characters can be decoded correctly, + * avoiding split-character issues. The default delimiters used by default are + * {@code \n} and {@code \r\n} but that can be customized. + * + * @author Sebastien Deleuze + * @author Brian Clozel + * @author Arjen Poutsma + * @author Mark Paluch + * @see CharSequenceEncoder + */ +public final class CharBufferDecoder extends AbstractDataBufferDecoder { + + /** + * The default charset "UTF-8". + */ + public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; + + /** + * The default delimiter strings to use, i.e. {@code \r\n} and {@code \n}. + */ + public static final List DEFAULT_DELIMITERS = List.of("\r\n", "\n"); + + + private final List delimiters; + + private final boolean stripDelimiter; + + private Charset defaultCharset = DEFAULT_CHARSET; + + private final ConcurrentMap delimitersCache = new ConcurrentHashMap<>(); + + + private CharBufferDecoder(List delimiters, boolean stripDelimiter, MimeType... mimeTypes) { + super(mimeTypes); + Assert.notEmpty(delimiters, "'delimiters' must not be empty"); + this.delimiters = new ArrayList<>(delimiters); + this.stripDelimiter = stripDelimiter; + } + + /** + * Set the default character set to fall back on if the MimeType does not specify any. + *

By default this is {@code UTF-8}. + * @param defaultCharset the charset to fall back on + */ + public void setDefaultCharset(Charset defaultCharset) { + this.defaultCharset = defaultCharset; + } + + /** + * Return the configured {@link #setDefaultCharset(Charset) defaultCharset}. + */ + public Charset getDefaultCharset() { + return this.defaultCharset; + } + + @Override + public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) { + return elementType.resolve() == CharBuffer.class && super.canDecode(elementType, mimeType); + } + + @Override + public Flux decode(Publisher input, ResolvableType elementType, + @Nullable MimeType mimeType, @Nullable Map hints) { + + byte[][] delimiterBytes = getDelimiterBytes(mimeType); + + LimitedDataBufferList chunks = new LimitedDataBufferList(getMaxInMemorySize()); + DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); + + return Flux.from(input) + .concatMapIterable(buffer -> processDataBuffer(buffer, matcher, chunks)) + .concatWith(Mono.defer(() -> { + if (chunks.isEmpty()) { + return Mono.empty(); + } + DataBuffer lastBuffer = chunks.get(0).factory().join(chunks); + chunks.clear(); + return Mono.just(lastBuffer); + })) + .doOnTerminate(chunks::releaseAndClear) + .doOnDiscard(DataBuffer.class, DataBufferUtils::release) + .map(buffer -> decode(buffer, elementType, mimeType, hints)); + } + + private byte[][] getDelimiterBytes(@Nullable MimeType mimeType) { + return this.delimitersCache.computeIfAbsent(getCharset(mimeType), charset -> { + byte[][] result = new byte[this.delimiters.size()][]; + for (int i = 0; i < this.delimiters.size(); i++) { + result[i] = this.delimiters.get(i).getBytes(charset); + } + return result; + }); + } + + private Collection processDataBuffer( + DataBuffer buffer, DataBufferUtils.Matcher matcher, LimitedDataBufferList chunks) { + + boolean release = true; + try { + List result = null; + do { + int endIndex = matcher.match(buffer); + if (endIndex == -1) { + chunks.add(buffer); + release = false; + break; + } + DataBuffer split = buffer.split(endIndex + 1); + if (result == null) { + result = new ArrayList<>(); + } + int delimiterLength = matcher.delimiter().length; + if (chunks.isEmpty()) { + if (this.stripDelimiter) { + split.writePosition(split.writePosition() - delimiterLength); + } + result.add(split); + } + else { + chunks.add(split); + DataBuffer joined = buffer.factory().join(chunks); + if (this.stripDelimiter) { + joined.writePosition(joined.writePosition() - delimiterLength); + } + result.add(joined); + chunks.clear(); + } + } + while (buffer.readableByteCount() > 0); + return (result != null ? result : Collections.emptyList()); + } + finally { + if (release) { + DataBufferUtils.release(buffer); + } + } + } + + @Override + public CharBuffer decode(DataBuffer dataBuffer, ResolvableType elementType, + @Nullable MimeType mimeType, @Nullable Map hints) { + + Charset charset = getCharset(mimeType); + CharBuffer charBuffer = charset.decode(dataBuffer.toByteBuffer()); + DataBufferUtils.release(dataBuffer); + LogFormatUtils.traceDebug(logger, traceOn -> { + String formatted = LogFormatUtils.formatValue(charBuffer, !traceOn); + return Hints.getLogPrefix(hints) + "Decoded " + formatted; + }); + return charBuffer; + } + + private Charset getCharset(@Nullable MimeType mimeType) { + if (mimeType != null && mimeType.getCharset() != null) { + return mimeType.getCharset(); + } + else { + return getDefaultCharset(); + } + } + + /** + * Create a {@code CharBufferDecoder} for {@code "text/plain"}. + */ + public static CharBufferDecoder textPlainOnly() { + return textPlainOnly(DEFAULT_DELIMITERS, true); + } + + /** + * Create a {@code CharBufferDecoder} for {@code "text/plain"}. + * @param delimiters delimiter strings to use to split the input stream + * @param stripDelimiter whether to remove delimiters from the resulting + * input strings + */ + public static CharBufferDecoder textPlainOnly(List delimiters, boolean stripDelimiter) { + var textPlain = new MimeType("text", "plain", DEFAULT_CHARSET); + return new CharBufferDecoder(delimiters, stripDelimiter, textPlain); + } + + /** + * Create a {@code CharBufferDecoder} that supports all MIME types. + */ + public static CharBufferDecoder allMimeTypes() { + return allMimeTypes(DEFAULT_DELIMITERS, true); + } + + /** + * Create a {@code CharBufferDecoder} that supports all MIME types. + * @param delimiters delimiter strings to use to split the input stream + * @param stripDelimiter whether to remove delimiters from the resulting + * input strings + */ + public static CharBufferDecoder allMimeTypes(List delimiters, boolean stripDelimiter) { + var textPlain = new MimeType("text", "plain", DEFAULT_CHARSET); + return new CharBufferDecoder(delimiters, stripDelimiter, textPlain, MimeTypeUtils.ALL); + } + +} diff --git a/spring-core/src/test/java/org/springframework/core/codec/CharBufferDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/CharBufferDecoderTests.java new file mode 100644 index 0000000000..f5070ab91e --- /dev/null +++ b/spring-core/src/test/java/org/springframework/core/codec/CharBufferDecoderTests.java @@ -0,0 +1,276 @@ +/* + * Copyright 2002-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.codec; + +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import org.springframework.core.ResolvableType; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferLimitException; +import org.springframework.core.testfixture.codec.AbstractDecoderTests; +import org.springframework.util.MimeType; +import org.springframework.util.MimeTypeUtils; + +import static java.nio.charset.StandardCharsets.UTF_16BE; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for {@link CharBufferDecoder}. + * + * @author Sebastien Deleuze + * @author Brian Clozel + * @author Mark Paluch + */ +class CharBufferDecoderTests extends AbstractDecoderTests { + + private static final ResolvableType TYPE = ResolvableType.forClass(CharBuffer.class); + + CharBufferDecoderTests() { + super(CharBufferDecoder.allMimeTypes()); + } + + @Override + @Test + public void canDecode() { + assertThat(this.decoder.canDecode(TYPE, MimeTypeUtils.TEXT_PLAIN)).isTrue(); + assertThat(this.decoder.canDecode(TYPE, MimeTypeUtils.TEXT_HTML)).isTrue(); + assertThat(this.decoder.canDecode(TYPE, MimeTypeUtils.APPLICATION_JSON)).isTrue(); + assertThat(this.decoder.canDecode(TYPE, MimeTypeUtils.parseMimeType("text/plain;charset=utf-8"))).isTrue(); + assertThat(this.decoder.canDecode(ResolvableType.forClass(Integer.class), MimeTypeUtils.TEXT_PLAIN)).isFalse(); + assertThat(this.decoder.canDecode(ResolvableType.forClass(Object.class), MimeTypeUtils.APPLICATION_JSON)).isFalse(); + } + + @Override + @Test + public void decode() { + CharBuffer u = charBuffer("ü"); + CharBuffer e = charBuffer("é"); + CharBuffer o = charBuffer("ø"); + String s = String.format("%s\n%s\n%s", u, e, o); + Flux input = buffers(s, 1, UTF_8); + + // TODO: temporarily replace testDecodeAll with explicit decode/cancel/empty + // see https://github.com/reactor/reactor-core/issues/2041 + +// testDecode(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); +// testDecodeCancel(input, TYPE, null, null); +// testDecodeEmpty(TYPE, null, null); + + testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); + } + + @Test + void decodeMultibyteCharacterUtf16() { + CharBuffer u = charBuffer("ü"); + CharBuffer e = charBuffer("é"); + CharBuffer o = charBuffer("ø"); + String s = String.format("%s\n%s\n%s", u, e, o); + Flux source = buffers(s, 2, UTF_16BE); + MimeType mimeType = MimeTypeUtils.parseMimeType("text/plain;charset=utf-16be"); + + testDecode(source, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), mimeType, null); + } + + private Flux buffers(String s, int length, Charset charset) { + byte[] bytes = s.getBytes(charset); + List chunks = new ArrayList<>(); + for (int i = 0; i < bytes.length; i += length) { + chunks.add(Arrays.copyOfRange(bytes, i, i + length)); + } + return Flux.fromIterable(chunks) + .map(this::buffer); + } + + @Test + void decodeNewLine() { + Flux input = buffers( + "\r\nabc\n", + "def", + "ghi\r\n\n", + "jkl", + "mno\npqr\n", + "stu", + "vw", + "xyz"); + + testDecode(input, CharBuffer.class, step -> step + .expectNext(charBuffer("")).as("1st") + .expectNext(charBuffer("abc")) + .expectNext(charBuffer("defghi")) + .expectNext(charBuffer("")).as("2nd") + .expectNext(charBuffer("jklmno")) + .expectNext(charBuffer("pqr")) + .expectNext(charBuffer("stuvwxyz")) + .expectComplete() + .verify()); + } + + @Test + void decodeNewlinesAcrossBuffers() { + Flux input = buffers( + "\r", + "\n", + "xyz"); + + testDecode(input, CharBuffer.class, step -> step + .expectNext(charBuffer("")) + .expectNext(charBuffer("xyz")) + .expectComplete() + .verify()); + } + + @Test + void maxInMemoryLimit() { + Flux input = buffers( + "abc\n", "defg\n", + "hi", "jkl", "mnop"); + + this.decoder.setMaxInMemorySize(5); + + testDecode(input, CharBuffer.class, step -> step + .expectNext(charBuffer("abc")) + .expectNext(charBuffer("defg")) + .verifyError(DataBufferLimitException.class)); + } + + @Test + void maxInMemoryLimitDoesNotApplyToParsedItemsThatDontRequireBuffering() { + Flux input = buffers( + "TOO MUCH DATA\nanother line\n\nand another\n"); + + this.decoder.setMaxInMemorySize(5); + + testDecode(input, CharBuffer.class, step -> step + .expectNext(charBuffer("TOO MUCH DATA")) + .expectNext(charBuffer("another line")) + .expectNext(charBuffer("")) + .expectNext(charBuffer("and another")) + .expectComplete() + .verify()); + } + + @Test + // gh-24339 + void maxInMemoryLimitReleaseUnprocessedLinesWhenUnlimited() { + Flux input = buffers("Line 1\nLine 2\nLine 3\n"); + + this.decoder.setMaxInMemorySize(-1); + + testDecodeCancel(input, ResolvableType.forClass(String.class), null, Collections.emptyMap()); + } + + @Test + void decodeNewLineIncludeDelimiters() { + this.decoder = CharBufferDecoder.allMimeTypes(CharBufferDecoder.DEFAULT_DELIMITERS, false); + + Flux input = buffers( + "\r\nabc\n", + "def", + "ghi\r\n\n", + "jkl", + "mno\npqr\n", + "stu", + "vw", + "xyz"); + + testDecode(input, CharBuffer.class, step -> step + .expectNext(charBuffer("\r\n")) + .expectNext(charBuffer("abc\n")) + .expectNext(charBuffer("defghi\r\n")) + .expectNext(charBuffer("\n")) + .expectNext(charBuffer("jklmno\n")) + .expectNext(charBuffer("pqr\n")) + .expectNext(charBuffer("stuvwxyz")) + .expectComplete() + .verify()); + } + + @Test + void decodeEmptyFlux() { + Flux input = Flux.empty(); + + testDecode(input, String.class, step -> step + .expectComplete() + .verify()); + } + + @Test + void decodeEmptyDataBuffer() { + Flux input = buffers(""); + + Flux output = this.decoder.decode(input, TYPE, null, Collections.emptyMap()); + + StepVerifier.create(output) + .expectNext(charBuffer("")) + .expectComplete().verify(); + } + + @Override + @Test + public void decodeToMono() { + Flux input = buffers( + "foo", + "bar", + "baz"); + + testDecodeToMonoAll(input, CharBuffer.class, step -> step + .expectNext(charBuffer("foobarbaz")) + .expectComplete() + .verify()); + } + + @Test + void decodeToMonoWithEmptyFlux() { + Flux input = buffers(); + + testDecodeToMono(input, String.class, step -> step + .expectComplete() + .verify()); + } + + private Flux buffers(String... value) { + return Flux.just(value).map(this::buffer); + } + + private DataBuffer buffer(String value) { + return buffer(value.getBytes(UTF_8)); + } + + private DataBuffer buffer(byte[] value) { + DataBuffer buffer = this.bufferFactory.allocateBuffer(value.length); + buffer.write(value); + return buffer; + } + + private CharBuffer charBuffer(String value) { + return CharBuffer + .allocate(value.length()) + .put(value) + .flip(); + } + +} diff --git a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonDecoder.java b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonDecoder.java index 612449fe69..d5d1d29acb 100644 --- a/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonDecoder.java +++ b/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2JsonDecoder.java @@ -16,6 +16,7 @@ package org.springframework.http.codec.json; +import java.nio.CharBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -26,7 +27,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import org.springframework.core.ResolvableType; -import org.springframework.core.codec.StringDecoder; +import org.springframework.core.codec.CharBufferDecoder; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder; @@ -45,9 +46,9 @@ import org.springframework.util.MimeTypeUtils; */ public class Jackson2JsonDecoder extends AbstractJackson2Decoder { - private static final StringDecoder STRING_DECODER = StringDecoder.textPlainOnly(Arrays.asList(",", "\n"), false); + private static final CharBufferDecoder CHAR_BUFFER_DECODER = CharBufferDecoder.textPlainOnly(Arrays.asList(",", "\n"), false); - private static final ResolvableType STRING_TYPE = ResolvableType.forClass(String.class); + private static final ResolvableType CHAR_BUFFER_TYPE = ResolvableType.forClass(CharBuffer.class); public Jackson2JsonDecoder() { @@ -73,12 +74,10 @@ public class Jackson2JsonDecoder extends AbstractJackson2Decoder { return flux; } - // Potentially, the memory consumption of this conversion could be improved by using CharBuffers instead - // of allocating Strings, but that would require refactoring the buffer tokenization code from StringDecoder - + // Re-encode as UTF-8. MimeType textMimeType = new MimeType(MimeTypeUtils.TEXT_PLAIN, charset); - Flux decoded = STRING_DECODER.decode(input, STRING_TYPE, textMimeType, null); - return decoded.map(s -> DefaultDataBufferFactory.sharedInstance.wrap(s.getBytes(StandardCharsets.UTF_8))); + Flux decoded = CHAR_BUFFER_DECODER.decode(input, CHAR_BUFFER_TYPE, textMimeType, null); + return decoded.map(charBuffer -> DefaultDataBufferFactory.sharedInstance.wrap(StandardCharsets.UTF_8.encode(charBuffer))); } }