From e625ace8c8c919b3db16cf02ba1ddee9ce9db452 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Wed, 8 May 2019 11:50:42 +0200 Subject: [PATCH] Add reference counting for UndertowDataBuffer This commit adds proper reference counting to the UndertowDataBuffer. --- .../reactive/UndertowServerHttpRequest.java | 67 +++++++++++++------ 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index 5a6771fd0f..bd506e219d 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -24,6 +24,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.nio.charset.Charset; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntPredicate; import javax.net.ssl.SSLSession; @@ -211,31 +212,45 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { private final PooledByteBuffer pooledByteBuffer; + private final AtomicInteger refCount; + public UndertowDataBuffer(DataBuffer dataBuffer, PooledByteBuffer pooledByteBuffer) { this.dataBuffer = dataBuffer; this.pooledByteBuffer = pooledByteBuffer; + this.refCount = new AtomicInteger(1); + } + + private UndertowDataBuffer(DataBuffer dataBuffer, PooledByteBuffer pooledByteBuffer, + AtomicInteger refCount) { + this.refCount = refCount; + this.dataBuffer = dataBuffer; + this.pooledByteBuffer = pooledByteBuffer; } @Override public boolean isAllocated() { - return this.pooledByteBuffer.isOpen(); + return this.refCount.get() > 0; } @Override public PooledDataBuffer retain() { + this.refCount.incrementAndGet(); + DataBufferUtils.retain(this.dataBuffer); return this; } @Override public boolean release() { - boolean result; - try { - result = DataBufferUtils.release(this.dataBuffer); - } - finally { - this.pooledByteBuffer.close(); + int refCount = this.refCount.decrementAndGet(); + if (refCount == 0) { + try { + return DataBufferUtils.release(this.dataBuffer); + } + finally { + this.pooledByteBuffer.close(); + } } - return result && this.pooledByteBuffer.isOpen(); + return false; } @Override @@ -280,7 +295,8 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { @Override public DataBuffer writePosition(int writePosition) { - return this.dataBuffer.writePosition(writePosition); + this.dataBuffer.writePosition(writePosition); + return this; } @Override @@ -290,12 +306,14 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { @Override public DataBuffer capacity(int newCapacity) { - return this.dataBuffer.capacity(newCapacity); + this.dataBuffer.capacity(newCapacity); + return this; } @Override public DataBuffer ensureCapacity(int capacity) { - return this.dataBuffer.ensureCapacity(capacity); + this.dataBuffer.ensureCapacity(capacity); + return this; } @Override @@ -310,47 +328,56 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest { @Override public DataBuffer read(byte[] destination) { - return this.dataBuffer.read(destination); + this.dataBuffer.read(destination); + return this; } @Override public DataBuffer read(byte[] destination, int offset, int length) { - return this.dataBuffer.read(destination, offset, length); + this.dataBuffer.read(destination, offset, length); + return this; } @Override public DataBuffer write(byte b) { - return this.dataBuffer.write(b); + this.dataBuffer.write(b); + return this; } @Override public DataBuffer write(byte[] source) { - return this.dataBuffer.write(source); + this.dataBuffer.write(source); + return this; } @Override public DataBuffer write(byte[] source, int offset, int length) { - return this.dataBuffer.write(source, offset, length); + this.dataBuffer.write(source, offset, length); + return this; } @Override public DataBuffer write(DataBuffer... buffers) { - return this.dataBuffer.write(buffers); + this.dataBuffer.write(buffers); + return this; } @Override public DataBuffer write(ByteBuffer... byteBuffers) { - return this.dataBuffer.write(byteBuffers); + this.dataBuffer.write(byteBuffers); + return this; } @Override public DataBuffer write(CharSequence charSequence, Charset charset) { - return this.dataBuffer.write(charSequence, charset); + this.dataBuffer.write(charSequence, charset); + return this; } @Override public DataBuffer slice(int index, int length) { - return this.dataBuffer.slice(index, length); + DataBuffer slice = this.dataBuffer.slice(index, length); + return new UndertowDataBuffer(slice, this.pooledByteBuffer, this.refCount); } @Override