Browse Source

Refactor the usage of Undertow ByteBufferPool

- lazy allocate the PooledByteBuffer, only if there is a request body
  for reading
- close the PooledByteBuffer once the request finishes
pull/1267/merge
Violeta Georgieva 8 years ago committed by Rossen Stoyanchev
parent
commit
8d786e8bba
  1. 2
      spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java
  2. 4
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java
  3. 23
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

2
spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java

@ -78,7 +78,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> { @@ -78,7 +78,7 @@ public abstract class AbstractListenerReadPublisher<T> implements Publisher<T> {
* @see ReadListener#onAllDataRead()
* @see org.xnio.ChannelListener#handleEvent(Channel)
*/
public final void onAllDataRead() {
public void onAllDataRead() {
if (this.logger.isTraceEnabled()) {
this.logger.trace(this.state + " onAllDataRead");
}

4
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java

@ -58,7 +58,7 @@ public class UndertowHttpHandlerAdapter extends HttpHandlerAdapterSupport @@ -58,7 +58,7 @@ public class UndertowHttpHandlerAdapter extends HttpHandlerAdapterSupport
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
ServerHttpRequest request = new UndertowServerHttpRequest(exchange, this.dataBufferFactory);
UndertowServerHttpRequest request = new UndertowServerHttpRequest(exchange, this.dataBufferFactory);
ServerHttpResponse response = new UndertowServerHttpResponse(exchange, this.dataBufferFactory);
getHttpHandler().handle(request, response).subscribe(new Subscriber<Void>() {
@ -76,11 +76,13 @@ public class UndertowHttpHandlerAdapter extends HttpHandlerAdapterSupport @@ -76,11 +76,13 @@ public class UndertowHttpHandlerAdapter extends HttpHandlerAdapterSupport
if (!exchange.isResponseStarted() && exchange.getStatusCode() <= 500) {
exchange.setStatusCode(500);
}
request.close();
exchange.endExchange();
}
@Override
public void onComplete() {
logger.debug("Successfully completed request");
request.close();
exchange.endExchange();
}
});

23
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

@ -21,6 +21,7 @@ import java.net.URI; @@ -21,6 +21,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.server.HttpServerExchange;
import io.undertow.server.handlers.Cookie;
@ -106,6 +107,10 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { @@ -106,6 +107,10 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
return Flux.from(this.body);
}
void close() {
this.body.onAllDataRead();
}
private static class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> {
private final ChannelListener<StreamSourceChannel> readListener =
@ -118,13 +123,14 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { @@ -118,13 +123,14 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
private final DataBufferFactory dataBufferFactory;
private final PooledByteBuffer pooledByteBuffer;
private final ByteBufferPool byteBufferPool;
private PooledByteBuffer pooledByteBuffer;
public RequestBodyPublisher(HttpServerExchange exchange,
DataBufferFactory dataBufferFactory) {
this.requestChannel = exchange.getRequestChannel();
this.pooledByteBuffer =
exchange.getConnection().getByteBufferPool().allocate();
this.byteBufferPool = exchange.getConnection().getByteBufferPool();
this.dataBufferFactory = dataBufferFactory;
}
@ -141,6 +147,9 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { @@ -141,6 +147,9 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
@Override
protected DataBuffer read() throws IOException {
if (this.pooledByteBuffer == null) {
this.pooledByteBuffer = this.byteBufferPool.allocate();
}
ByteBuffer byteBuffer = this.pooledByteBuffer.getBuffer();
int read = this.requestChannel.read(byteBuffer);
if (logger.isTraceEnabled()) {
@ -157,6 +166,14 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { @@ -157,6 +166,14 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
return null;
}
@Override
public void onAllDataRead() {
if (this.pooledByteBuffer != null && this.pooledByteBuffer.isOpen()) {
this.pooledByteBuffer.close();
}
super.onAllDataRead();
}
private class ReadListener implements ChannelListener<StreamSourceChannel> {
@Override

Loading…
Cancel
Save