Browse Source

undertow support using non-blocking API’s

pull/1111/head
Marek Hawrylczak 9 years ago
parent
commit
4c84117155
  1. 2
      spring-web-reactive/build.gradle
  2. 240
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestBodyPublisher.java
  3. 79
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestHandlerAdapter.java
  4. 200
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/ResponseBodySubscriber.java
  5. 70
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowHttpServer.java
  6. 86
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpRequest.java
  7. 113
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpResponse.java
  8. 4
      spring-web-reactive/src/test/java/org/springframework/reactive/web/http/AbstractHttpHandlerIntegrationTests.java

2
spring-web-reactive/build.gradle

@ -47,6 +47,8 @@ dependencies { @@ -47,6 +47,8 @@ dependencies {
optional 'org.apache.tomcat:tomcat-util:8.0.28'
optional 'org.apache.tomcat.embed:tomcat-embed-core:8.0.28'
optional 'io.undertow:undertow-core:1.3.5.Final'
optional 'org.eclipse.jetty:jetty-server:9.3.5.v20151012'
optional 'org.eclipse.jetty:jetty-servlet:9.3.5.v20151012'

240
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestBodyPublisher.java

@ -0,0 +1,240 @@ @@ -0,0 +1,240 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http.undertow;
import static org.xnio.IoUtils.safeClose;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.springframework.util.Assert;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.SameThreadExecutor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.xnio.ChannelListener;
import org.xnio.channels.StreamSourceChannel;
import reactor.core.error.SpecificationExceptions;
import reactor.core.support.BackpressureUtils;
/**
* @author Marek Hawrylczak
*/
class RequestBodyPublisher implements Publisher<ByteBuffer> {
private static final AtomicLongFieldUpdater<RequestBodySubscription> DEMAND =
AtomicLongFieldUpdater.newUpdater(RequestBodySubscription.class, "demand");
private final HttpServerExchange exchange;
private Subscriber<? super ByteBuffer> subscriber;
public RequestBodyPublisher(HttpServerExchange exchange) {
Assert.notNull(exchange, "'exchange' is required.");
this.exchange = exchange;
}
@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
if (s == null) {
throw SpecificationExceptions.spec_2_13_exception();
}
if (subscriber != null) {
s.onError(new IllegalStateException("Only one subscriber allowed"));
}
subscriber = s;
subscriber.onSubscribe(new RequestBodySubscription());
}
private class RequestBodySubscription
implements Subscription, Runnable, ChannelListener<StreamSourceChannel> {
volatile long demand;
private PooledByteBuffer pooledBuffer;
private StreamSourceChannel channel;
private boolean subscriptionClosed;
private boolean draining;
@Override
public void cancel() {
subscriptionClosed = true;
close();
}
@Override
public void request(long n) {
BackpressureUtils.checkRequest(n, subscriber);
if (subscriptionClosed) {
return;
}
BackpressureUtils.getAndAdd(DEMAND, this, n);
scheduleNextMessage();
}
private void scheduleNextMessage() {
exchange.dispatch(exchange.isInIoThread() ?
SameThreadExecutor.INSTANCE : exchange.getIoThread(), this);
}
private void doOnNext(ByteBuffer buffer) {
draining = false;
buffer.flip();
subscriber.onNext(buffer);
}
private void doOnComplete() {
subscriptionClosed = true;
try {
subscriber.onComplete();
}
finally {
close();
}
}
private void doOnError(Throwable t) {
subscriptionClosed = true;
try {
subscriber.onError(t);
}
finally {
close();
}
}
private void close() {
if (pooledBuffer != null) {
safeClose(pooledBuffer);
pooledBuffer = null;
}
if (channel != null) {
safeClose(channel);
channel = null;
}
}
@Override
public void run() {
if (subscriptionClosed || draining) {
return;
}
if (0 == BackpressureUtils.getAndSub(DEMAND, this, 1)) {
return;
}
draining = true;
if (channel == null) {
channel = exchange.getRequestChannel();
if (channel == null) {
if (exchange.isRequestComplete()) {
return;
}
else {
throw new IllegalStateException(
"Another party already acquired the channel!");
}
}
}
if (pooledBuffer == null) {
pooledBuffer = exchange.getConnection().getByteBufferPool().allocate();
}
else {
pooledBuffer.getBuffer().clear();
}
try {
ByteBuffer buffer = pooledBuffer.getBuffer();
int count;
do {
count = channel.read(buffer);
if (count == 0) {
channel.getReadSetter().set(this);
channel.resumeReads();
}
else if (count == -1) {
if (buffer.position() > 0) {
doOnNext(buffer);
}
doOnComplete();
}
else {
if (buffer.remaining() == 0) {
if (demand == 0) {
channel.suspendReads();
}
doOnNext(buffer);
if (demand > 0) {
scheduleNextMessage();
}
break;
}
}
} while (count > 0);
}
catch (IOException e) {
doOnError(e);
}
}
@Override
public void handleEvent(StreamSourceChannel channel) {
if (subscriptionClosed) {
return;
}
try {
ByteBuffer buffer = pooledBuffer.getBuffer();
int count;
do {
count = channel.read(buffer);
if (count == 0) {
return;
}
else if (count == -1) {
if (buffer.position() > 0) {
doOnNext(buffer);
}
doOnComplete();
}
else {
if (buffer.remaining() == 0) {
if (demand == 0) {
channel.suspendReads();
}
doOnNext(buffer);
if (demand > 0) {
scheduleNextMessage();
}
break;
}
}
} while (count > 0);
}
catch (IOException e) {
doOnError(e);
}
}
}
}

79
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestHandlerAdapter.java

@ -0,0 +1,79 @@ @@ -0,0 +1,79 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http.undertow;
import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.http.server.ReactiveServerHttpResponse;
import org.springframework.reactive.web.http.HttpHandler;
import org.springframework.util.Assert;
import io.undertow.server.HttpServerExchange;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
/**
* @author Marek Hawrylczak
*/
class RequestHandlerAdapter implements io.undertow.server.HttpHandler {
private final HttpHandler httpHandler;
public RequestHandlerAdapter(HttpHandler httpHandler) {
Assert.notNull(httpHandler, "'httpHandler' is required.");
this.httpHandler = httpHandler;
}
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
RequestBodyPublisher requestBodyPublisher = new RequestBodyPublisher(exchange);
ReactiveServerHttpRequest request =
new UndertowServerHttpRequest(exchange, requestBodyPublisher);
ResponseBodySubscriber responseBodySubscriber = new ResponseBodySubscriber(exchange);
ReactiveServerHttpResponse response =
new UndertowServerHttpResponse(exchange, responseBodySubscriber);
exchange.dispatch();
httpHandler.handle(request, response).subscribe(new Subscriber<Void>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Void aVoid) {
}
@Override
public void onError(Throwable t) {
if (!exchange.isResponseStarted() &&
exchange.getStatusCode() < INTERNAL_SERVER_ERROR.value()) {
exchange.setStatusCode(INTERNAL_SERVER_ERROR.value());
}
exchange.endExchange();
}
@Override
public void onComplete() {
exchange.endExchange();
}
});
}
}

200
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/ResponseBodySubscriber.java

@ -0,0 +1,200 @@ @@ -0,0 +1,200 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http.undertow;
import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR;
import static org.xnio.ChannelListeners.closingChannelExceptionHandler;
import static org.xnio.ChannelListeners.flushingChannelListener;
import static org.xnio.IoUtils.safeClose;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import io.undertow.connector.PooledByteBuffer;
import io.undertow.server.HttpServerExchange;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Subscription;
import org.xnio.ChannelListener;
import org.xnio.channels.StreamSinkChannel;
import reactor.core.subscriber.BaseSubscriber;
/**
* @author Marek Hawrylczak
*/
class ResponseBodySubscriber extends BaseSubscriber<ByteBuffer>
implements ChannelListener<StreamSinkChannel> {
private static final Log logger = LogFactory.getLog(ResponseBodySubscriber.class);
private final HttpServerExchange exchange;
private final Queue<PooledByteBuffer> buffers;
private final AtomicInteger writing = new AtomicInteger();
private final AtomicBoolean closing = new AtomicBoolean();
private StreamSinkChannel responseChannel;
private Subscription subscription;
public ResponseBodySubscriber(HttpServerExchange exchange) {
this.exchange = exchange;
this.buffers = new ConcurrentLinkedQueue<>();
}
@Override
public void onSubscribe(Subscription s) {
super.onSubscribe(s);
subscription = s;
subscription.request(1);
}
@Override
public void onNext(ByteBuffer buffer) {
super.onNext(buffer);
if (responseChannel == null) {
responseChannel = exchange.getResponseChannel();
}
writing.incrementAndGet();
try {
int c;
do {
c = responseChannel.write(buffer);
} while (buffer.hasRemaining() && c > 0);
if (buffer.hasRemaining()) {
writing.incrementAndGet();
enqueue(buffer);
responseChannel.getWriteSetter().set(this);
responseChannel.resumeWrites();
}
else {
this.subscription.request(1);
}
}
catch (IOException ex) {
onError(ex);
}
finally {
writing.decrementAndGet();
if (closing.get()) {
closeIfDone();
}
}
}
private void enqueue(ByteBuffer src) {
do {
PooledByteBuffer pooledBuffer =
exchange.getConnection().getByteBufferPool().allocate();
ByteBuffer dst = pooledBuffer.getBuffer();
copy(dst, src);
dst.flip();
buffers.add(pooledBuffer);
} while (src.remaining() > 0);
}
private void copy(ByteBuffer dst, ByteBuffer src) {
int n = Math.min(dst.capacity(), src.remaining());
for (int i = 0; i < n; i++) {
dst.put(src.get());
}
}
@Override
public void handleEvent(StreamSinkChannel channel) {
try {
int c;
do {
ByteBuffer buffer = buffers.peek().getBuffer();
do {
c = channel.write(buffer);
} while (buffer.hasRemaining() && c > 0);
if (!buffer.hasRemaining()) {
safeClose(buffers.remove());
}
} while (!buffers.isEmpty() && c > 0);
if (!buffers.isEmpty()) {
channel.resumeWrites();
}
else {
writing.decrementAndGet();
if (closing.get()) {
closeIfDone();
}
else {
subscription.request(1);
}
}
}
catch (IOException ex) {
onError(ex);
}
}
@Override
public void onError(Throwable t) {
super.onError(t);
if (!exchange.isResponseStarted() &&
exchange.getStatusCode() < INTERNAL_SERVER_ERROR.value()) {
exchange.setStatusCode(INTERNAL_SERVER_ERROR.value());
}
logger.error("ResponseBodySubscriber error", t);
}
@Override
public void onComplete() {
super.onComplete();
if (responseChannel != null) {
closing.set(true);
closeIfDone();
}
}
private void closeIfDone() {
if (writing.get() == 0) {
if (closing.compareAndSet(true, false)) {
closeChannel();
}
}
}
private void closeChannel() {
try {
responseChannel.shutdownWrites();
if (!responseChannel.flush()) {
responseChannel.getWriteSetter().set(
flushingChannelListener(
o -> safeClose(responseChannel),
closingChannelExceptionHandler()));
responseChannel.resumeWrites();
}
responseChannel = null;
}
catch (IOException ex) {
onError(ex);
}
}
}

70
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowHttpServer.java

@ -0,0 +1,70 @@ @@ -0,0 +1,70 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http.undertow;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.reactive.web.http.HttpServer;
import org.springframework.reactive.web.http.HttpServerSupport;
import org.springframework.util.Assert;
import io.undertow.Undertow;
import io.undertow.server.HttpHandler;
/**
* @author Marek Hawrylczak
*/
public class UndertowHttpServer extends HttpServerSupport
implements InitializingBean, HttpServer {
private Undertow undertowServer;
private boolean running;
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(getHttpHandler());
HttpHandler handler = new RequestHandlerAdapter(getHttpHandler());
undertowServer = Undertow.builder()
.addHttpListener(getPort() != -1 ? getPort() : 8080, "localhost")
.setHandler(handler)
.build();
}
@Override
public void start() {
if (!running) {
undertowServer.start();
running = true;
}
}
@Override
public void stop() {
if (running) {
undertowServer.stop();
running = false;
}
}
@Override
public boolean isRunning() {
return running;
}
}

86
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpRequest.java

@ -0,0 +1,86 @@ @@ -0,0 +1,86 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http.undertow;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.ReactiveServerHttpRequest;
import org.springframework.util.StringUtils;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.HeaderValues;
import org.reactivestreams.Publisher;
/**
* @author Marek Hawrylczak
*/
class UndertowServerHttpRequest implements ReactiveServerHttpRequest {
private final HttpServerExchange exchange;
private final Publisher<ByteBuffer> requestBodyPublisher;
private HttpHeaders headers;
public UndertowServerHttpRequest(HttpServerExchange exchange,
Publisher<ByteBuffer> requestBodyPublisher) {
this.exchange = exchange;
this.requestBodyPublisher = requestBodyPublisher;
}
@Override
public Publisher<ByteBuffer> getBody() {
return this.requestBodyPublisher;
}
@Override
public HttpMethod getMethod() {
return HttpMethod.valueOf(exchange.getRequestMethod().toString());
}
@Override
public URI getURI() {
try {
StringBuilder uri = new StringBuilder(exchange.getRequestPath());
if (StringUtils.hasLength(exchange.getQueryString())) {
uri.append('?').append(exchange.getQueryString());
}
return new URI(uri.toString());
}
catch (URISyntaxException ex) {
throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
}
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
for (HeaderValues headerValues : exchange.getRequestHeaders()) {
for (String value : headerValues) {
this.headers.add(headerValues.getHeaderName().toString(), value);
}
}
}
return this.headers;
}
}

113
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpResponse.java

@ -0,0 +1,113 @@ @@ -0,0 +1,113 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http.undertow;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.ReactiveServerHttpResponse;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.HttpString;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.rx.Streams;
/**
* @author Marek Hawrylczak
*/
class UndertowServerHttpResponse implements ReactiveServerHttpResponse {
private final HttpServerExchange exchange;
private final HttpHeaders headers;
private final ResponseBodySubscriber responseBodySubscriber;
private boolean headersWritten = false;
public UndertowServerHttpResponse(HttpServerExchange exchange,
ResponseBodySubscriber responseBodySubscriber) {
this.exchange = exchange;
this.responseBodySubscriber = responseBodySubscriber;
this.headers = new HttpHeaders();
}
@Override
public void setStatusCode(HttpStatus status) {
exchange.setStatusCode(status.value());
}
@Override
public Publisher<Void> setBody(Publisher<ByteBuffer> contentPublisher) {
applyHeaders();
return s -> s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
Streams.wrap(contentPublisher)
.finallyDo(byteBufferSignal -> {
if (byteBufferSignal.isOnComplete()) {
s.onComplete();
}
else {
s.onError(byteBufferSignal.getThrowable());
}
}
).subscribe(responseBodySubscriber);
}
@Override
public void cancel() {
}
});
}
@Override
public HttpHeaders getHeaders() {
return (this.headersWritten ?
HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);
}
@Override
public Publisher<Void> writeHeaders() {
applyHeaders();
return s -> s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
s.onComplete();
}
@Override
public void cancel() {
}
});
}
private void applyHeaders() {
if (!this.headersWritten) {
for (Map.Entry<String, List<String>> entry : this.headers.entrySet()) {
String headerName = entry.getKey();
exchange.getResponseHeaders()
.addAll(HttpString.tryFromString(headerName), entry.getValue());
}
this.headersWritten = true;
}
}
}

4
spring-web-reactive/src/test/java/org/springframework/reactive/web/http/AbstractHttpHandlerIntegrationTests.java

@ -25,6 +25,7 @@ import org.springframework.reactive.web.http.reactor.ReactorHttpServer; @@ -25,6 +25,7 @@ import org.springframework.reactive.web.http.reactor.ReactorHttpServer;
import org.springframework.reactive.web.http.rxnetty.RxNettyHttpServer;
import org.springframework.reactive.web.http.servlet.JettyHttpServer;
import org.springframework.reactive.web.http.servlet.TomcatHttpServer;
import org.springframework.reactive.web.http.undertow.UndertowHttpServer;
import org.springframework.util.SocketUtils;
@ -43,7 +44,8 @@ public abstract class AbstractHttpHandlerIntegrationTests { @@ -43,7 +44,8 @@ public abstract class AbstractHttpHandlerIntegrationTests {
{new JettyHttpServer()},
{new RxNettyHttpServer()},
{new ReactorHttpServer()},
{new TomcatHttpServer()}
{new TomcatHttpServer()},
{new UndertowHttpServer()}
};
}

Loading…
Cancel
Save