Browse Source

Ensure chunks released on cancel in StringDecoder

The current test were not catching the issue because they request 1
via StepVerifier, wait for it, and then cancel. In the case of
StringDecoder it means all chunks are used up to produce that first
String and so the cancel doesn't catch any cached chunks.

Closes gh-30299
pull/30465/head
rstoyanchev 2 years ago
parent
commit
e737980033
  1. 2
      spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java
  2. 27
      spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java

2
spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java

@ -124,7 +124,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> { @@ -124,7 +124,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder<String> {
chunks.clear();
return Mono.just(lastBuffer);
}))
.doOnTerminate(chunks::releaseAndClear)
.doFinally(signalType -> chunks.releaseAndClear())
.doOnDiscard(DataBuffer.class, DataBufferUtils::release)
.map(buffer -> decode(buffer, elementType, mimeType, hints));
}

27
spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,12 +18,15 @@ package org.springframework.core.codec; @@ -18,12 +18,15 @@ package org.springframework.core.codec;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
@ -75,14 +78,15 @@ class StringDecoderTests extends AbstractDecoderTests<StringDecoder> { @@ -75,14 +78,15 @@ class StringDecoderTests extends AbstractDecoderTests<StringDecoder> {
String s = String.format("%s\n%s\n%s", u, e, o);
Flux<DataBuffer> input = toDataBuffers(s, 1, UTF_8);
// TODO: temporarily replace testDecodeAll with explicit decode/cancel/empty
// see https://github.com/reactor/reactor-core/issues/2041
testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null);
}
// testDecode(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null);
// testDecodeCancel(input, TYPE, null, null);
// testDecodeEmpty(TYPE, null, null);
@Test // gh-30299
public void decodeAndCancelWithPendingChunks() {
Flux<DataBuffer> input = toDataBuffers("abc", 1, UTF_8).concatWith(Flux.never());
Flux<String> result = this.decoder.decode(input, TYPE, null, null);
testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null);
StepVerifier.create(result).thenAwait(Duration.ofMillis(100)).thenCancel().verify();
}
@Test
@ -264,4 +268,13 @@ class StringDecoderTests extends AbstractDecoderTests<StringDecoder> { @@ -264,4 +268,13 @@ class StringDecoderTests extends AbstractDecoderTests<StringDecoder> {
return buffer;
}
private static class SingleRequestSubscriber extends BaseSubscriber<String> {
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(1);
}
}
}

Loading…
Cancel
Save