Browse Source

Replace close method with Undertow exchange listener

The Undertow HttpServerExchange has a complete listener which we can
use instead of the close() method UndertowServerHttpRequest.
pull/1267/merge
Rossen Stoyanchev 8 years ago
parent
commit
d3e05296e1
  1. 2
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java
  2. 11
      spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

2
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowHttpHandlerAdapter.java

@ -76,13 +76,11 @@ public class UndertowHttpHandlerAdapter extends HttpHandlerAdapterSupport
if (!exchange.isResponseStarted() && exchange.getStatusCode() <= 500) { if (!exchange.isResponseStarted() && exchange.getStatusCode() <= 500) {
exchange.setStatusCode(500); exchange.setStatusCode(500);
} }
request.close();
exchange.endExchange(); exchange.endExchange();
} }
@Override @Override
public void onComplete() { public void onComplete() {
logger.debug("Successfully completed request"); logger.debug("Successfully completed request");
request.close();
exchange.endExchange(); exchange.endExchange();
} }
}); });

11
spring-web/src/main/java/org/springframework/http/server/reactive/UndertowServerHttpRequest.java

@ -58,7 +58,7 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
super(initUri(exchange), initHeaders(exchange)); super(initUri(exchange), initHeaders(exchange));
this.exchange = exchange; this.exchange = exchange;
this.body = new RequestBodyPublisher(exchange, dataBufferFactory); this.body = new RequestBodyPublisher(exchange, dataBufferFactory);
this.body.registerListener(); this.body.registerListener(exchange);
} }
private static URI initUri(HttpServerExchange exchange) { private static URI initUri(HttpServerExchange exchange) {
@ -107,9 +107,6 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
return Flux.from(this.body); return Flux.from(this.body);
} }
void close() {
this.body.onAllDataRead();
}
private static class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> { private static class RequestBodyPublisher extends AbstractListenerReadPublisher<DataBuffer> {
@ -134,7 +131,11 @@ public class UndertowServerHttpRequest extends AbstractServerHttpRequest {
this.dataBufferFactory = dataBufferFactory; this.dataBufferFactory = dataBufferFactory;
} }
private void registerListener() { private void registerListener(HttpServerExchange exchange) {
exchange.addExchangeCompleteListener((ex, next) -> {
onAllDataRead();
next.proceed();
});
this.requestChannel.getReadSetter().set(this.readListener); this.requestChannel.getReadSetter().set(this.readListener);
this.requestChannel.getCloseSetter().set(this.closeListener); this.requestChannel.getCloseSetter().set(this.closeListener);
this.requestChannel.resumeReads(); this.requestChannel.resumeReads();

Loading…
Cancel
Save