From 3ebbfa2191376d9920c57b545fbd3c07167b4c1e Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 10 Apr 2019 15:33:19 -0400 Subject: [PATCH 1/2] Fix refCount issue in LeakAwareDataBuffer LeakAwareDataBuffer was keeping its own refCount rather than checking through the delegate. This leads to false leak reports in a sequence where an allocated buffer is retained and then sliced since it is not aware of the changes to the refCount through the slice. --- .../core/io/buffer/LeakAwareDataBuffer.java | 15 +++++++++------ .../io/buffer/LeakAwareDataBufferFactory.java | 6 +++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java index 5c24dc2ac8..a41a99aa69 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBuffer.java @@ -37,8 +37,6 @@ class LeakAwareDataBuffer implements PooledDataBuffer { private final LeakAwareDataBufferFactory dataBufferFactory; - private int refCount = 1; - LeakAwareDataBuffer(DataBuffer delegate, LeakAwareDataBufferFactory dataBufferFactory) { Assert.notNull(delegate, "Delegate must not be null"); @@ -67,19 +65,24 @@ class LeakAwareDataBuffer implements PooledDataBuffer { @Override public boolean isAllocated() { - return this.refCount > 0; + return this.delegate instanceof PooledDataBuffer && + ((PooledDataBuffer) this.delegate).isAllocated(); } @Override public PooledDataBuffer retain() { - this.refCount++; + if (this.delegate instanceof PooledDataBuffer) { + ((PooledDataBuffer) this.delegate).retain(); + } return this; } @Override public boolean release() { - this.refCount--; - return this.refCount == 0; + if (this.delegate instanceof PooledDataBuffer) { + ((PooledDataBuffer) this.delegate).release(); + } + return isAllocated(); } // delegation diff --git a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java index e79da10275..62900ad9f5 100644 --- a/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java +++ b/spring-core/src/test/java/org/springframework/core/io/buffer/LeakAwareDataBufferFactory.java @@ -99,16 +99,16 @@ public class LeakAwareDataBufferFactory implements DataBufferFactory { @Override public DataBuffer allocateBuffer() { - return allocateBufferInternal(this.delegate.allocateBuffer()); + return createLeakAwareDataBuffer(this.delegate.allocateBuffer()); } @Override public DataBuffer allocateBuffer(int initialCapacity) { - return allocateBufferInternal(this.delegate.allocateBuffer(initialCapacity)); + return createLeakAwareDataBuffer(this.delegate.allocateBuffer(initialCapacity)); } @NotNull - private DataBuffer allocateBufferInternal(DataBuffer delegateBuffer) { + private DataBuffer createLeakAwareDataBuffer(DataBuffer delegateBuffer) { LeakAwareDataBuffer dataBuffer = new LeakAwareDataBuffer(delegateBuffer, this); this.created.add(dataBuffer); return dataBuffer; From bd2c213b471c1a42b17620ff97da1f2d7665dee3 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 10 Apr 2019 15:36:36 -0400 Subject: [PATCH 2/2] Remove buffer release used as workaround The extra buffer release was used as a workaround for a reactor core issue and should have already been removed as part of b3bc2d9253efc4cfb5d81b487ecaa620e4b88879. --- .../core/io/buffer/DataBufferUtils.java | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java index 40198ba9fa..b12f9434b2 100644 --- a/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java +++ b/spring-core/src/main/java/org/springframework/core/io/buffer/DataBufferUtils.java @@ -524,13 +524,9 @@ public abstract class DataBufferUtils { long pos = this.position.addAndGet(read); dataBuffer.writePosition(read); this.sink.next(dataBuffer); - // It's possible for cancellation to happen right before the push into the sink + // onNext may have led to onCancel (e.g. downstream takeUntil) if (this.disposed.get()) { - // TODO: - // This is not ideal since we already passed the buffer into the sink and - // releasing may cause something reading to fail. Maybe we won't have to - // do this after https://github.com/reactor/reactor-core/issues/1634 - complete(dataBuffer); + complete(); } else { DataBuffer newDataBuffer = this.dataBufferFactory.allocateBuffer(this.bufferSize); @@ -539,12 +535,12 @@ public abstract class DataBufferUtils { } } else { - complete(dataBuffer); + release(dataBuffer); + complete(); } } - private void complete(DataBuffer dataBuffer) { - release(dataBuffer); + private void complete() { this.sink.complete(); closeChannel(this.channel); }