Browse Source

Add reference counting for UndertowDataBuffer

This commit adds proper reference counting to the UndertowDataBuffer.
pull/23050/head
Arjen Poutsma 6 years ago
parent
commit
cdd346222c
  1. 67
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

67
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

@ -24,6 +24,7 @@ import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntPredicate; import java.util.function.IntPredicate;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
@ -211,31 +212,45 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
private final PooledByteBuffer pooledByteBuffer; private final PooledByteBuffer pooledByteBuffer;
private final AtomicInteger refCount;
public UndertowDataBuffer(DataBuffer dataBuffer, PooledByteBuffer pooledByteBuffer) { public UndertowDataBuffer(DataBuffer dataBuffer, PooledByteBuffer pooledByteBuffer) {
this.dataBuffer = dataBuffer; this.dataBuffer = dataBuffer;
this.pooledByteBuffer = pooledByteBuffer; this.pooledByteBuffer = pooledByteBuffer;
this.refCount = new AtomicInteger(1);
}
private UndertowDataBuffer(DataBuffer dataBuffer, PooledByteBuffer pooledByteBuffer,
AtomicInteger refCount) {
this.refCount = refCount;
this.dataBuffer = dataBuffer;
this.pooledByteBuffer = pooledByteBuffer;
} }
@Override @Override
public boolean isAllocated() { public boolean isAllocated() {
return this.pooledByteBuffer.isOpen(); return this.refCount.get() > 0;
} }
@Override @Override
public PooledDataBuffer retain() { public PooledDataBuffer retain() {
this.refCount.incrementAndGet();
DataBufferUtils.retain(this.dataBuffer);
return this; return this;
} }
@Override @Override
public boolean release() { public boolean release() {
boolean result; int refCount = this.refCount.decrementAndGet();
try { if (refCount == 0) {
result = DataBufferUtils.release(this.dataBuffer); try {
} return DataBufferUtils.release(this.dataBuffer);
finally { }
this.pooledByteBuffer.close(); finally {
this.pooledByteBuffer.close();
}
} }
return result && this.pooledByteBuffer.isOpen(); return false;
} }
@Override @Override
@ -280,7 +295,8 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
@Override @Override
public DataBuffer writePosition(int writePosition) { public DataBuffer writePosition(int writePosition) {
return this.dataBuffer.writePosition(writePosition); this.dataBuffer.writePosition(writePosition);
return this;
} }
@Override @Override
@ -290,12 +306,14 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
@Override @Override
public DataBuffer capacity(int newCapacity) { public DataBuffer capacity(int newCapacity) {
return this.dataBuffer.capacity(newCapacity); this.dataBuffer.capacity(newCapacity);
return this;
} }
@Override @Override
public DataBuffer ensureCapacity(int capacity) { public DataBuffer ensureCapacity(int capacity) {
return this.dataBuffer.ensureCapacity(capacity); this.dataBuffer.ensureCapacity(capacity);
return this;
} }
@Override @Override
@ -310,47 +328,56 @@ class UndertowServerHttpRequest extends AbstractServerHttpRequest {
@Override @Override
public DataBuffer read(byte[] destination) { public DataBuffer read(byte[] destination) {
return this.dataBuffer.read(destination); this.dataBuffer.read(destination);
return this;
} }
@Override @Override
public DataBuffer read(byte[] destination, int offset, int length) { public DataBuffer read(byte[] destination, int offset, int length) {
return this.dataBuffer.read(destination, offset, length); this.dataBuffer.read(destination, offset, length);
return this;
} }
@Override @Override
public DataBuffer write(byte b) { public DataBuffer write(byte b) {
return this.dataBuffer.write(b); this.dataBuffer.write(b);
return this;
} }
@Override @Override
public DataBuffer write(byte[] source) { public DataBuffer write(byte[] source) {
return this.dataBuffer.write(source); this.dataBuffer.write(source);
return this;
} }
@Override @Override
public DataBuffer write(byte[] source, int offset, int length) { public DataBuffer write(byte[] source, int offset, int length) {
return this.dataBuffer.write(source, offset, length); this.dataBuffer.write(source, offset, length);
return this;
} }
@Override @Override
public DataBuffer write(DataBuffer... buffers) { public DataBuffer write(DataBuffer... buffers) {
return this.dataBuffer.write(buffers); this.dataBuffer.write(buffers);
return this;
} }
@Override @Override
public DataBuffer write(ByteBuffer... byteBuffers) { public DataBuffer write(ByteBuffer... byteBuffers) {
return this.dataBuffer.write(byteBuffers); this.dataBuffer.write(byteBuffers);
return this;
} }
@Override @Override
public DataBuffer write(CharSequence charSequence, Charset charset) { public DataBuffer write(CharSequence charSequence, Charset charset) {
return this.dataBuffer.write(charSequence, charset); this.dataBuffer.write(charSequence, charset);
return this;
} }
@Override @Override
public DataBuffer slice(int index, int length) { public DataBuffer slice(int index, int length) {
return this.dataBuffer.slice(index, length); DataBuffer slice = this.dataBuffer.slice(index, length);
return new UndertowDataBuffer(slice, this.pooledByteBuffer, this.refCount);
} }
@Override @Override

Loading…
Cancel
Save