Markus Heiden
2 years ago
committed by
Arjen Poutsma
3 changed files with 528 additions and 8 deletions
@ -0,0 +1,245 @@
@@ -0,0 +1,245 @@
|
||||
/* |
||||
* Copyright 2002-2022 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.core.codec; |
||||
|
||||
import java.nio.CharBuffer; |
||||
import java.nio.charset.Charset; |
||||
import java.nio.charset.StandardCharsets; |
||||
import java.util.ArrayList; |
||||
import java.util.Collection; |
||||
import java.util.Collections; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
import java.util.concurrent.ConcurrentHashMap; |
||||
import java.util.concurrent.ConcurrentMap; |
||||
|
||||
import org.reactivestreams.Publisher; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.core.publisher.Mono; |
||||
|
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferUtils; |
||||
import org.springframework.core.io.buffer.LimitedDataBufferList; |
||||
import org.springframework.core.log.LogFormatUtils; |
||||
import org.springframework.lang.Nullable; |
||||
import org.springframework.util.Assert; |
||||
import org.springframework.util.MimeType; |
||||
import org.springframework.util.MimeTypeUtils; |
||||
|
||||
/** |
||||
* Decode from a data buffer stream to a {@code CharBuffer} stream, either splitting |
||||
* or aggregating incoming data chunks to realign along newlines delimiters |
||||
* and produce a stream of char buffers. This is useful for streaming but is also |
||||
* necessary to ensure that multi-byte characters can be decoded correctly, |
||||
* avoiding split-character issues. The default delimiters used by default are |
||||
* {@code \n} and {@code \r\n} but that can be customized. |
||||
* |
||||
* @author Sebastien Deleuze |
||||
* @author Brian Clozel |
||||
* @author Arjen Poutsma |
||||
* @author Mark Paluch |
||||
* @see CharSequenceEncoder |
||||
*/ |
||||
public final class CharBufferDecoder extends AbstractDataBufferDecoder<CharBuffer> { |
||||
|
||||
/** |
||||
* The default charset "UTF-8". |
||||
*/ |
||||
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; |
||||
|
||||
/** |
||||
* The default delimiter strings to use, i.e. {@code \r\n} and {@code \n}. |
||||
*/ |
||||
public static final List<String> DEFAULT_DELIMITERS = List.of("\r\n", "\n"); |
||||
|
||||
|
||||
private final List<String> delimiters; |
||||
|
||||
private final boolean stripDelimiter; |
||||
|
||||
private Charset defaultCharset = DEFAULT_CHARSET; |
||||
|
||||
private final ConcurrentMap<Charset, byte[][]> delimitersCache = new ConcurrentHashMap<>(); |
||||
|
||||
|
||||
private CharBufferDecoder(List<String> delimiters, boolean stripDelimiter, MimeType... mimeTypes) { |
||||
super(mimeTypes); |
||||
Assert.notEmpty(delimiters, "'delimiters' must not be empty"); |
||||
this.delimiters = new ArrayList<>(delimiters); |
||||
this.stripDelimiter = stripDelimiter; |
||||
} |
||||
|
||||
/** |
||||
* Set the default character set to fall back on if the MimeType does not specify any. |
||||
* <p>By default this is {@code UTF-8}. |
||||
* @param defaultCharset the charset to fall back on |
||||
*/ |
||||
public void setDefaultCharset(Charset defaultCharset) { |
||||
this.defaultCharset = defaultCharset; |
||||
} |
||||
|
||||
/** |
||||
* Return the configured {@link #setDefaultCharset(Charset) defaultCharset}. |
||||
*/ |
||||
public Charset getDefaultCharset() { |
||||
return this.defaultCharset; |
||||
} |
||||
|
||||
@Override |
||||
public boolean canDecode(ResolvableType elementType, @Nullable MimeType mimeType) { |
||||
return elementType.resolve() == CharBuffer.class && super.canDecode(elementType, mimeType); |
||||
} |
||||
|
||||
@Override |
||||
public Flux<CharBuffer> decode(Publisher<DataBuffer> input, ResolvableType elementType, |
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { |
||||
|
||||
byte[][] delimiterBytes = getDelimiterBytes(mimeType); |
||||
|
||||
LimitedDataBufferList chunks = new LimitedDataBufferList(getMaxInMemorySize()); |
||||
DataBufferUtils.Matcher matcher = DataBufferUtils.matcher(delimiterBytes); |
||||
|
||||
return Flux.from(input) |
||||
.concatMapIterable(buffer -> processDataBuffer(buffer, matcher, chunks)) |
||||
.concatWith(Mono.defer(() -> { |
||||
if (chunks.isEmpty()) { |
||||
return Mono.empty(); |
||||
} |
||||
DataBuffer lastBuffer = chunks.get(0).factory().join(chunks); |
||||
chunks.clear(); |
||||
return Mono.just(lastBuffer); |
||||
})) |
||||
.doOnTerminate(chunks::releaseAndClear) |
||||
.doOnDiscard(DataBuffer.class, DataBufferUtils::release) |
||||
.map(buffer -> decode(buffer, elementType, mimeType, hints)); |
||||
} |
||||
|
||||
private byte[][] getDelimiterBytes(@Nullable MimeType mimeType) { |
||||
return this.delimitersCache.computeIfAbsent(getCharset(mimeType), charset -> { |
||||
byte[][] result = new byte[this.delimiters.size()][]; |
||||
for (int i = 0; i < this.delimiters.size(); i++) { |
||||
result[i] = this.delimiters.get(i).getBytes(charset); |
||||
} |
||||
return result; |
||||
}); |
||||
} |
||||
|
||||
private Collection<DataBuffer> processDataBuffer( |
||||
DataBuffer buffer, DataBufferUtils.Matcher matcher, LimitedDataBufferList chunks) { |
||||
|
||||
boolean release = true; |
||||
try { |
||||
List<DataBuffer> result = null; |
||||
do { |
||||
int endIndex = matcher.match(buffer); |
||||
if (endIndex == -1) { |
||||
chunks.add(buffer); |
||||
release = false; |
||||
break; |
||||
} |
||||
DataBuffer split = buffer.split(endIndex + 1); |
||||
if (result == null) { |
||||
result = new ArrayList<>(); |
||||
} |
||||
int delimiterLength = matcher.delimiter().length; |
||||
if (chunks.isEmpty()) { |
||||
if (this.stripDelimiter) { |
||||
split.writePosition(split.writePosition() - delimiterLength); |
||||
} |
||||
result.add(split); |
||||
} |
||||
else { |
||||
chunks.add(split); |
||||
DataBuffer joined = buffer.factory().join(chunks); |
||||
if (this.stripDelimiter) { |
||||
joined.writePosition(joined.writePosition() - delimiterLength); |
||||
} |
||||
result.add(joined); |
||||
chunks.clear(); |
||||
} |
||||
} |
||||
while (buffer.readableByteCount() > 0); |
||||
return (result != null ? result : Collections.emptyList()); |
||||
} |
||||
finally { |
||||
if (release) { |
||||
DataBufferUtils.release(buffer); |
||||
} |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public CharBuffer decode(DataBuffer dataBuffer, ResolvableType elementType, |
||||
@Nullable MimeType mimeType, @Nullable Map<String, Object> hints) { |
||||
|
||||
Charset charset = getCharset(mimeType); |
||||
CharBuffer charBuffer = charset.decode(dataBuffer.toByteBuffer()); |
||||
DataBufferUtils.release(dataBuffer); |
||||
LogFormatUtils.traceDebug(logger, traceOn -> { |
||||
String formatted = LogFormatUtils.formatValue(charBuffer, !traceOn); |
||||
return Hints.getLogPrefix(hints) + "Decoded " + formatted; |
||||
}); |
||||
return charBuffer; |
||||
} |
||||
|
||||
private Charset getCharset(@Nullable MimeType mimeType) { |
||||
if (mimeType != null && mimeType.getCharset() != null) { |
||||
return mimeType.getCharset(); |
||||
} |
||||
else { |
||||
return getDefaultCharset(); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* Create a {@code CharBufferDecoder} for {@code "text/plain"}. |
||||
*/ |
||||
public static CharBufferDecoder textPlainOnly() { |
||||
return textPlainOnly(DEFAULT_DELIMITERS, true); |
||||
} |
||||
|
||||
/** |
||||
* Create a {@code CharBufferDecoder} for {@code "text/plain"}. |
||||
* @param delimiters delimiter strings to use to split the input stream |
||||
* @param stripDelimiter whether to remove delimiters from the resulting |
||||
* input strings |
||||
*/ |
||||
public static CharBufferDecoder textPlainOnly(List<String> delimiters, boolean stripDelimiter) { |
||||
var textPlain = new MimeType("text", "plain", DEFAULT_CHARSET); |
||||
return new CharBufferDecoder(delimiters, stripDelimiter, textPlain); |
||||
} |
||||
|
||||
/** |
||||
* Create a {@code CharBufferDecoder} that supports all MIME types. |
||||
*/ |
||||
public static CharBufferDecoder allMimeTypes() { |
||||
return allMimeTypes(DEFAULT_DELIMITERS, true); |
||||
} |
||||
|
||||
/** |
||||
* Create a {@code CharBufferDecoder} that supports all MIME types. |
||||
* @param delimiters delimiter strings to use to split the input stream |
||||
* @param stripDelimiter whether to remove delimiters from the resulting |
||||
* input strings |
||||
*/ |
||||
public static CharBufferDecoder allMimeTypes(List<String> delimiters, boolean stripDelimiter) { |
||||
var textPlain = new MimeType("text", "plain", DEFAULT_CHARSET); |
||||
return new CharBufferDecoder(delimiters, stripDelimiter, textPlain, MimeTypeUtils.ALL); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,276 @@
@@ -0,0 +1,276 @@
|
||||
/* |
||||
* Copyright 2002-2020 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.core.codec; |
||||
|
||||
import java.nio.CharBuffer; |
||||
import java.nio.charset.Charset; |
||||
import java.util.ArrayList; |
||||
import java.util.Arrays; |
||||
import java.util.Collections; |
||||
import java.util.List; |
||||
|
||||
import org.junit.jupiter.api.Test; |
||||
import reactor.core.publisher.Flux; |
||||
import reactor.test.StepVerifier; |
||||
|
||||
import org.springframework.core.ResolvableType; |
||||
import org.springframework.core.io.buffer.DataBuffer; |
||||
import org.springframework.core.io.buffer.DataBufferLimitException; |
||||
import org.springframework.core.testfixture.codec.AbstractDecoderTests; |
||||
import org.springframework.util.MimeType; |
||||
import org.springframework.util.MimeTypeUtils; |
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_16BE; |
||||
import static java.nio.charset.StandardCharsets.UTF_8; |
||||
import static org.assertj.core.api.Assertions.assertThat; |
||||
|
||||
/** |
||||
* Unit tests for {@link CharBufferDecoder}. |
||||
* |
||||
* @author Sebastien Deleuze |
||||
* @author Brian Clozel |
||||
* @author Mark Paluch |
||||
*/ |
||||
class CharBufferDecoderTests extends AbstractDecoderTests<CharBufferDecoder> { |
||||
|
||||
private static final ResolvableType TYPE = ResolvableType.forClass(CharBuffer.class); |
||||
|
||||
CharBufferDecoderTests() { |
||||
super(CharBufferDecoder.allMimeTypes()); |
||||
} |
||||
|
||||
@Override |
||||
@Test |
||||
public void canDecode() { |
||||
assertThat(this.decoder.canDecode(TYPE, MimeTypeUtils.TEXT_PLAIN)).isTrue(); |
||||
assertThat(this.decoder.canDecode(TYPE, MimeTypeUtils.TEXT_HTML)).isTrue(); |
||||
assertThat(this.decoder.canDecode(TYPE, MimeTypeUtils.APPLICATION_JSON)).isTrue(); |
||||
assertThat(this.decoder.canDecode(TYPE, MimeTypeUtils.parseMimeType("text/plain;charset=utf-8"))).isTrue(); |
||||
assertThat(this.decoder.canDecode(ResolvableType.forClass(Integer.class), MimeTypeUtils.TEXT_PLAIN)).isFalse(); |
||||
assertThat(this.decoder.canDecode(ResolvableType.forClass(Object.class), MimeTypeUtils.APPLICATION_JSON)).isFalse(); |
||||
} |
||||
|
||||
@Override |
||||
@Test |
||||
public void decode() { |
||||
CharBuffer u = charBuffer("ü"); |
||||
CharBuffer e = charBuffer("é"); |
||||
CharBuffer o = charBuffer("ø"); |
||||
String s = String.format("%s\n%s\n%s", u, e, o); |
||||
Flux<DataBuffer> input = buffers(s, 1, UTF_8); |
||||
|
||||
// TODO: temporarily replace testDecodeAll with explicit decode/cancel/empty
|
||||
// see https://github.com/reactor/reactor-core/issues/2041
|
||||
|
||||
// testDecode(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null);
|
||||
// testDecodeCancel(input, TYPE, null, null);
|
||||
// testDecodeEmpty(TYPE, null, null);
|
||||
|
||||
testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); |
||||
} |
||||
|
||||
@Test |
||||
void decodeMultibyteCharacterUtf16() { |
||||
CharBuffer u = charBuffer("ü"); |
||||
CharBuffer e = charBuffer("é"); |
||||
CharBuffer o = charBuffer("ø"); |
||||
String s = String.format("%s\n%s\n%s", u, e, o); |
||||
Flux<DataBuffer> source = buffers(s, 2, UTF_16BE); |
||||
MimeType mimeType = MimeTypeUtils.parseMimeType("text/plain;charset=utf-16be"); |
||||
|
||||
testDecode(source, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), mimeType, null); |
||||
} |
||||
|
||||
private Flux<DataBuffer> buffers(String s, int length, Charset charset) { |
||||
byte[] bytes = s.getBytes(charset); |
||||
List<byte[]> chunks = new ArrayList<>(); |
||||
for (int i = 0; i < bytes.length; i += length) { |
||||
chunks.add(Arrays.copyOfRange(bytes, i, i + length)); |
||||
} |
||||
return Flux.fromIterable(chunks) |
||||
.map(this::buffer); |
||||
} |
||||
|
||||
@Test |
||||
void decodeNewLine() { |
||||
Flux<DataBuffer> input = buffers( |
||||
"\r\nabc\n", |
||||
"def", |
||||
"ghi\r\n\n", |
||||
"jkl", |
||||
"mno\npqr\n", |
||||
"stu", |
||||
"vw", |
||||
"xyz"); |
||||
|
||||
testDecode(input, CharBuffer.class, step -> step |
||||
.expectNext(charBuffer("")).as("1st") |
||||
.expectNext(charBuffer("abc")) |
||||
.expectNext(charBuffer("defghi")) |
||||
.expectNext(charBuffer("")).as("2nd") |
||||
.expectNext(charBuffer("jklmno")) |
||||
.expectNext(charBuffer("pqr")) |
||||
.expectNext(charBuffer("stuvwxyz")) |
||||
.expectComplete() |
||||
.verify()); |
||||
} |
||||
|
||||
@Test |
||||
void decodeNewlinesAcrossBuffers() { |
||||
Flux<DataBuffer> input = buffers( |
||||
"\r", |
||||
"\n", |
||||
"xyz"); |
||||
|
||||
testDecode(input, CharBuffer.class, step -> step |
||||
.expectNext(charBuffer("")) |
||||
.expectNext(charBuffer("xyz")) |
||||
.expectComplete() |
||||
.verify()); |
||||
} |
||||
|
||||
@Test |
||||
void maxInMemoryLimit() { |
||||
Flux<DataBuffer> input = buffers( |
||||
"abc\n", "defg\n", |
||||
"hi", "jkl", "mnop"); |
||||
|
||||
this.decoder.setMaxInMemorySize(5); |
||||
|
||||
testDecode(input, CharBuffer.class, step -> step |
||||
.expectNext(charBuffer("abc")) |
||||
.expectNext(charBuffer("defg")) |
||||
.verifyError(DataBufferLimitException.class)); |
||||
} |
||||
|
||||
@Test |
||||
void maxInMemoryLimitDoesNotApplyToParsedItemsThatDontRequireBuffering() { |
||||
Flux<DataBuffer> input = buffers( |
||||
"TOO MUCH DATA\nanother line\n\nand another\n"); |
||||
|
||||
this.decoder.setMaxInMemorySize(5); |
||||
|
||||
testDecode(input, CharBuffer.class, step -> step |
||||
.expectNext(charBuffer("TOO MUCH DATA")) |
||||
.expectNext(charBuffer("another line")) |
||||
.expectNext(charBuffer("")) |
||||
.expectNext(charBuffer("and another")) |
||||
.expectComplete() |
||||
.verify()); |
||||
} |
||||
|
||||
@Test |
||||
// gh-24339
|
||||
void maxInMemoryLimitReleaseUnprocessedLinesWhenUnlimited() { |
||||
Flux<DataBuffer> input = buffers("Line 1\nLine 2\nLine 3\n"); |
||||
|
||||
this.decoder.setMaxInMemorySize(-1); |
||||
|
||||
testDecodeCancel(input, ResolvableType.forClass(String.class), null, Collections.emptyMap()); |
||||
} |
||||
|
||||
@Test |
||||
void decodeNewLineIncludeDelimiters() { |
||||
this.decoder = CharBufferDecoder.allMimeTypes(CharBufferDecoder.DEFAULT_DELIMITERS, false); |
||||
|
||||
Flux<DataBuffer> input = buffers( |
||||
"\r\nabc\n", |
||||
"def", |
||||
"ghi\r\n\n", |
||||
"jkl", |
||||
"mno\npqr\n", |
||||
"stu", |
||||
"vw", |
||||
"xyz"); |
||||
|
||||
testDecode(input, CharBuffer.class, step -> step |
||||
.expectNext(charBuffer("\r\n")) |
||||
.expectNext(charBuffer("abc\n")) |
||||
.expectNext(charBuffer("defghi\r\n")) |
||||
.expectNext(charBuffer("\n")) |
||||
.expectNext(charBuffer("jklmno\n")) |
||||
.expectNext(charBuffer("pqr\n")) |
||||
.expectNext(charBuffer("stuvwxyz")) |
||||
.expectComplete() |
||||
.verify()); |
||||
} |
||||
|
||||
@Test |
||||
void decodeEmptyFlux() { |
||||
Flux<DataBuffer> input = Flux.empty(); |
||||
|
||||
testDecode(input, String.class, step -> step |
||||
.expectComplete() |
||||
.verify()); |
||||
} |
||||
|
||||
@Test |
||||
void decodeEmptyDataBuffer() { |
||||
Flux<DataBuffer> input = buffers(""); |
||||
|
||||
Flux<CharBuffer> output = this.decoder.decode(input, TYPE, null, Collections.emptyMap()); |
||||
|
||||
StepVerifier.create(output) |
||||
.expectNext(charBuffer("")) |
||||
.expectComplete().verify(); |
||||
} |
||||
|
||||
@Override |
||||
@Test |
||||
public void decodeToMono() { |
||||
Flux<DataBuffer> input = buffers( |
||||
"foo", |
||||
"bar", |
||||
"baz"); |
||||
|
||||
testDecodeToMonoAll(input, CharBuffer.class, step -> step |
||||
.expectNext(charBuffer("foobarbaz")) |
||||
.expectComplete() |
||||
.verify()); |
||||
} |
||||
|
||||
@Test |
||||
void decodeToMonoWithEmptyFlux() { |
||||
Flux<DataBuffer> input = buffers(); |
||||
|
||||
testDecodeToMono(input, String.class, step -> step |
||||
.expectComplete() |
||||
.verify()); |
||||
} |
||||
|
||||
private Flux<DataBuffer> buffers(String... value) { |
||||
return Flux.just(value).map(this::buffer); |
||||
} |
||||
|
||||
private DataBuffer buffer(String value) { |
||||
return buffer(value.getBytes(UTF_8)); |
||||
} |
||||
|
||||
private DataBuffer buffer(byte[] value) { |
||||
DataBuffer buffer = this.bufferFactory.allocateBuffer(value.length); |
||||
buffer.write(value); |
||||
return buffer; |
||||
} |
||||
|
||||
private CharBuffer charBuffer(String value) { |
||||
return CharBuffer |
||||
.allocate(value.length()) |
||||
.put(value) |
||||
.flip(); |
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue