diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 359cf189c4..cc7a70916f 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -532,13 +532,9 @@ public abstract class DataBufferUtils { long pos = this.position.addAndGet(read); dataBuffer.writePosition(read); this.sink.next(dataBuffer); - // It's possible for cancellation to happen right before the push into the sink + // onNext may have led to onCancel (e.g. downstream takeUntil) if (this.disposed.get()) { - // TODO: - // This is not ideal since we already passed the buffer into the sink and - // releasing may cause something reading to fail. Maybe we won't have to - // do this after https://github.com/reactor/reactor-core/issues/1634 - complete(dataBuffer); + complete(); } else { DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); @@ -547,12 +543,12 @@ public abstract class DataBufferUtils { } } else { - complete(dataBuffer); + release(dataBuffer); + complete(); } } - private void complete(DataBuffer dataBuffer) { - release(dataBuffer); + private void complete() { this.sink.complete(); closeChannel(this.channel); } diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java index 5c24dc2ac8..a41a99aa69 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java @@ -37,8 +37,6 @@ class LeakAwareDataBuffer implements PooledDataBuffer { private final LeakAwareDataBufferFactory dataBufferFactory; - private int refCount = 1; - LeakAwareDataBuffer(DataBuffer delegate, LeakAwareDataBufferFactory dataBufferFactory) { Assert.notNull(delegate, "Delegate must not be null"); @@ -67,19 +65,24 @@ class LeakAwareDataBuffer implements PooledDataBuffer { @Override public boolean isAllocated() { - return this.refCount > 0; + return this.delegate instanceof PooledDataBuffer && + ((PooledDataBuffer) this.delegate).isAllocated(); } @Override public PooledDataBuffer retain() { - this.refCount++; + if (this.delegate instanceof PooledDataBuffer) { + ((PooledDataBuffer) this.delegate).retain(); + } return this; } @Override public boolean release() { - this.refCount--; - return this.refCount == 0; + if (this.delegate instanceof PooledDataBuffer) { + ((PooledDataBuffer) this.delegate).release(); + } + return isAllocated(); } // delegation diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java index e79da10275..62900ad9f5 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java @@ -99,16 +99,16 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { @Override public DataBuffer allocateBuffer() { - return allocateBufferInternal(this.delegate.allocateBuffer()); + return createLeakAwareDataBuffer(this.delegate.allocateBuffer()); } @Override public DataBuffer allocateBuffer(int initialCapacity) { - return allocateBufferInternal(this.delegate.allocateBuffer(initialCapacity)); + return createLeakAwareDataBuffer(this.delegate.allocateBuffer(initialCapacity)); } @NotNull - private DataBuffer allocateBufferInternal(DataBuffer delegateBuffer) { + private DataBuffer createLeakAwareDataBuffer(DataBuffer delegateBuffer) { LeakAwareDataBuffer dataBuffer = new LeakAwareDataBuffer(delegateBuffer, this); this.created.add(dataBuffer); return dataBuffer;