diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java index 51146b4ee9..321795ed8c 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerReadPublisher.java @@ -78,7 +78,7 @@ public abstract class AbstractListenerReadPublisher implements Publisher { * @see ReadListener#onAllDataRead() * @see org.xnio.ChannelListener#handleEvent(Channel) */ - public final void onAllDataRead() { + public void onAllDataRead() { if (this.logger.isTraceEnabled()) { this.logger.trace(this.state + " onAllDataRead"); } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java index d7a1f750cb..273f9549f6 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java @@ -58,7 +58,7 @@ public class UndertowHttpHandlerAdapter extends HttpHandlerAdapterSupport @Override public void handleRequest(HttpServerExchange exchange) throws Exception { - ServerHttpRequest request = new UndertowServerHttpRequest(exchange, this.dataBufferFactory); + UndertowServerHttpRequest request = new UndertowServerHttpRequest(exchange, this.dataBufferFactory); ServerHttpResponse response = new UndertowServerHttpResponse(exchange, this.dataBufferFactory); getHttpHandler().handle(request, response).subscribe(new Subscriber() { @@ -76,11 +76,13 @@ public class UndertowHttpHandlerAdapter extends HttpHandlerAdapterSupport if (!exchange.isResponseStarted() && exchange.getStatusCode() <= 500) { exchange.setStatusCode(500); } + request.close(); exchange.endExchange(); } @Override public void onComplete() { logger.debug("Successfully completed request"); + request.close(); exchange.endExchange(); } }); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java index 9065bae6a3..e6161230cb 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java @@ -21,6 +21,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import io.undertow.connector.ByteBufferPool; import io.undertow.connector.PooledByteBuffer; import io.undertow.server.HttpServerExchange; import io.undertow.server.handlers.Cookie; @@ -106,6 +107,10 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { return Flux.from(this.body); } + void close() { + this.body.onAllDataRead(); + } + private static class RequestBodyPublisher extends AbstractListenerReadPublisher { private final ChannelListener readListener = @@ -118,13 +123,14 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { private final DataBufferFactory dataBufferFactory; - private final PooledByteBuffer pooledByteBuffer; + private final ByteBufferPool byteBufferPool; + + private PooledByteBuffer pooledByteBuffer; public RequestBodyPublisher(HttpServerExchange exchange, DataBufferFactory dataBufferFactory) { this.requestChannel = exchange.getRequestChannel(); - this.pooledByteBuffer = - exchange.getConnection().getByteBufferPool().allocate(); + this.byteBufferPool = exchange.getConnection().getByteBufferPool(); this.dataBufferFactory = dataBufferFactory; } @@ -141,6 +147,9 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { @Override protected DataBuffer read() throws IOException { + if (this.pooledByteBuffer == null) { + this.pooledByteBuffer = this.byteBufferPool.allocate(); + } ByteBuffer byteBuffer = this.pooledByteBuffer.getBuffer(); int read = this.requestChannel.read(byteBuffer); if (logger.isTraceEnabled()) { @@ -157,6 +166,14 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest { return null; } + @Override + public void onAllDataRead() { + if (this.pooledByteBuffer != null && this.pooledByteBuffer.isOpen()) { + this.pooledByteBuffer.close(); + } + super.onAllDataRead(); + } + private class ReadListener implements ChannelListener { @Override