From e73798003385a75fdbe10839f3d7480016848316 Mon Sep 17 00:00:00 2001 From: rstoyanchev Date: Tue, 9 May 2023 19:58:36 +0100 Subject: [PATCH] Ensure chunks released on cancel in StringDecoder The current test were not catching the issue because they request 1 via StepVerifier, wait for it, and then cancel. In the case of StringDecoder it means all chunks are used up to produce that first String and so the cancel doesn't catch any cached chunks. Closes gh-30299 --- .../core/codec/StringDecoder.java | 2 +- .../core/codec/StringDecoderTests.java | 27 ++++++++++++++----- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java index 5e771ae201..602734f43b 100644 --- a/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java +++ b/spring-core/src/main/java/org/springframework/core/codec/StringDecoder.java @@ -124,7 +124,7 @@ public final class StringDecoder extends AbstractDataBufferDecoder { chunks.clear(); return Mono.just(lastBuffer); })) - .doOnTerminate(chunks::releaseAndClear) + .doFinally(signalType -> chunks.releaseAndClear()) .doOnDiscard(DataBuffer.class, DataBufferUtils::release) .map(buffer -> decode(buffer, elementType, mimeType, hints)); } diff --git a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java index ade19bdfd4..e5593a7be8 100644 --- a/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java +++ b/spring-core/src/test/java/org/springframework/core/codec/StringDecoderTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2020 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,12 +18,15 @@ package org.springframework.core.codec; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscription; +import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; @@ -75,14 +78,15 @@ class StringDecoderTests extends AbstractDecoderTests { String s = String.format("%s\n%s\n%s", u, e, o); Flux input = toDataBuffers(s, 1, UTF_8); - // TODO: temporarily replace testDecodeAll with explicit decode/cancel/empty - // see https://github.com/reactor/reactor-core/issues/2041 + testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); + } -// testDecode(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); -// testDecodeCancel(input, TYPE, null, null); -// testDecodeEmpty(TYPE, null, null); + @Test // gh-30299 + public void decodeAndCancelWithPendingChunks() { + Flux input = toDataBuffers("abc", 1, UTF_8).concatWith(Flux.never()); + Flux result = this.decoder.decode(input, TYPE, null, null); - testDecodeAll(input, TYPE, step -> step.expectNext(u, e, o).verifyComplete(), null, null); + StepVerifier.create(result).thenAwait(Duration.ofMillis(100)).thenCancel().verify(); } @Test @@ -264,4 +268,13 @@ class StringDecoderTests extends AbstractDecoderTests { return buffer; } + + private static class SingleRequestSubscriber extends BaseSubscriber { + + @Override + protected void hookOnSubscribe(Subscription subscription) { + subscription.request(1); + } + } + }