diff --git a/spring-web-reactive/build.gradle b/spring-web-reactive/build.gradle index 1f416da579..2a951cf6ac 100644 --- a/spring-web-reactive/build.gradle +++ b/spring-web-reactive/build.gradle @@ -47,6 +47,8 @@ dependencies { optional 'org.apache.tomcat:tomcat-util:8.0.28' optional 'org.apache.tomcat.embed:tomcat-embed-core:8.0.28' + optional 'io.undertow:undertow-core:1.3.5.Final' + optional 'org.eclipse.jetty:jetty-server:9.3.5.v20151012' optional 'org.eclipse.jetty:jetty-servlet:9.3.5.v20151012' diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestBodyPublisher.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestBodyPublisher.java new file mode 100644 index 0000000000..c5d5bebacd --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestBodyPublisher.java @@ -0,0 +1,240 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.web.http.undertow; + +import static org.xnio.IoUtils.safeClose; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +import org.springframework.util.Assert; + +import io.undertow.connector.PooledByteBuffer; +import io.undertow.server.HttpServerExchange; +import io.undertow.util.SameThreadExecutor; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.xnio.ChannelListener; +import org.xnio.channels.StreamSourceChannel; +import reactor.core.error.SpecificationExceptions; +import reactor.core.support.BackpressureUtils; + +/** + * @author Marek Hawrylczak + */ +class RequestBodyPublisher implements Publisher { + + private static final AtomicLongFieldUpdater DEMAND = + AtomicLongFieldUpdater.newUpdater(RequestBodySubscription.class, "demand"); + private final HttpServerExchange exchange; + private Subscriber subscriber; + + public RequestBodyPublisher(HttpServerExchange exchange) { + Assert.notNull(exchange, "'exchange' is required."); + this.exchange = exchange; + } + + @Override + public void subscribe(Subscriber s) { + if (s == null) { + throw SpecificationExceptions.spec_2_13_exception(); + } + if (subscriber != null) { + s.onError(new IllegalStateException("Only one subscriber allowed")); + } + + subscriber = s; + subscriber.onSubscribe(new RequestBodySubscription()); + } + + private class RequestBodySubscription + implements Subscription, Runnable, ChannelListener { + + volatile long demand; + private PooledByteBuffer pooledBuffer; + private StreamSourceChannel channel; + private boolean subscriptionClosed; + private boolean draining; + + @Override + public void cancel() { + subscriptionClosed = true; + close(); + } + + @Override + public void request(long n) { + BackpressureUtils.checkRequest(n, subscriber); + + if (subscriptionClosed) { + return; + } + + BackpressureUtils.getAndAdd(DEMAND, this, n); + scheduleNextMessage(); + } + + private void scheduleNextMessage() { + exchange.dispatch(exchange.isInIoThread() ? + SameThreadExecutor.INSTANCE : exchange.getIoThread(), this); + } + + private void doOnNext(ByteBuffer buffer) { + draining = false; + buffer.flip(); + subscriber.onNext(buffer); + } + + private void doOnComplete() { + subscriptionClosed = true; + try { + subscriber.onComplete(); + } + finally { + close(); + } + } + + private void doOnError(Throwable t) { + subscriptionClosed = true; + try { + subscriber.onError(t); + } + finally { + close(); + } + } + + private void close() { + if (pooledBuffer != null) { + safeClose(pooledBuffer); + pooledBuffer = null; + } + if (channel != null) { + safeClose(channel); + channel = null; + } + } + + @Override + public void run() { + if (subscriptionClosed || draining) { + return; + } + + if (0 == BackpressureUtils.getAndSub(DEMAND, this, 1)) { + return; + } + + draining = true; + + if (channel == null) { + channel = exchange.getRequestChannel(); + + if (channel == null) { + if (exchange.isRequestComplete()) { + return; + } + else { + throw new IllegalStateException( + "Another party already acquired the channel!"); + } + } + } + if (pooledBuffer == null) { + pooledBuffer = exchange.getConnection().getByteBufferPool().allocate(); + } + else { + pooledBuffer.getBuffer().clear(); + } + + try { + ByteBuffer buffer = pooledBuffer.getBuffer(); + int count; + do { + count = channel.read(buffer); + if (count == 0) { + channel.getReadSetter().set(this); + channel.resumeReads(); + } + else if (count == -1) { + if (buffer.position() > 0) { + doOnNext(buffer); + } + doOnComplete(); + } + else { + if (buffer.remaining() == 0) { + if (demand == 0) { + channel.suspendReads(); + } + doOnNext(buffer); + if (demand > 0) { + scheduleNextMessage(); + } + break; + } + } + } while (count > 0); + } + catch (IOException e) { + doOnError(e); + } + } + + @Override + public void handleEvent(StreamSourceChannel channel) { + if (subscriptionClosed) { + return; + } + + try { + ByteBuffer buffer = pooledBuffer.getBuffer(); + int count; + do { + count = channel.read(buffer); + if (count == 0) { + return; + } + else if (count == -1) { + if (buffer.position() > 0) { + doOnNext(buffer); + } + doOnComplete(); + } + else { + if (buffer.remaining() == 0) { + if (demand == 0) { + channel.suspendReads(); + } + doOnNext(buffer); + if (demand > 0) { + scheduleNextMessage(); + } + break; + } + } + } while (count > 0); + } + catch (IOException e) { + doOnError(e); + } + } + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestHandlerAdapter.java new file mode 100644 index 0000000000..666b8ca7c5 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/RequestHandlerAdapter.java @@ -0,0 +1,79 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.web.http.undertow; + +import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR; + +import org.springframework.http.server.ReactiveServerHttpRequest; +import org.springframework.http.server.ReactiveServerHttpResponse; +import org.springframework.reactive.web.http.HttpHandler; +import org.springframework.util.Assert; + +import io.undertow.server.HttpServerExchange; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * @author Marek Hawrylczak + */ +class RequestHandlerAdapter implements io.undertow.server.HttpHandler { + + private final HttpHandler httpHandler; + + public RequestHandlerAdapter(HttpHandler httpHandler) { + Assert.notNull(httpHandler, "'httpHandler' is required."); + this.httpHandler = httpHandler; + } + + @Override + public void handleRequest(HttpServerExchange exchange) throws Exception { + RequestBodyPublisher requestBodyPublisher = new RequestBodyPublisher(exchange); + ReactiveServerHttpRequest request = + new UndertowServerHttpRequest(exchange, requestBodyPublisher); + + ResponseBodySubscriber responseBodySubscriber = new ResponseBodySubscriber(exchange); + ReactiveServerHttpResponse response = + new UndertowServerHttpResponse(exchange, responseBodySubscriber); + + exchange.dispatch(); + httpHandler.handle(request, response).subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(Void aVoid) { + } + + @Override + public void onError(Throwable t) { + if (!exchange.isResponseStarted() && + exchange.getStatusCode() < INTERNAL_SERVER_ERROR.value()) { + + exchange.setStatusCode(INTERNAL_SERVER_ERROR.value()); + } + exchange.endExchange(); + } + + @Override + public void onComplete() { + exchange.endExchange(); + } + }); + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/ResponseBodySubscriber.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/ResponseBodySubscriber.java new file mode 100644 index 0000000000..06f7a76cea --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/ResponseBodySubscriber.java @@ -0,0 +1,200 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.web.http.undertow; + +import static org.springframework.http.HttpStatus.INTERNAL_SERVER_ERROR; +import static org.xnio.ChannelListeners.closingChannelExceptionHandler; +import static org.xnio.ChannelListeners.flushingChannelListener; +import static org.xnio.IoUtils.safeClose; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import io.undertow.connector.PooledByteBuffer; +import io.undertow.server.HttpServerExchange; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.reactivestreams.Subscription; +import org.xnio.ChannelListener; +import org.xnio.channels.StreamSinkChannel; +import reactor.core.subscriber.BaseSubscriber; + +/** + * @author Marek Hawrylczak + */ +class ResponseBodySubscriber extends BaseSubscriber + implements ChannelListener { + + private static final Log logger = LogFactory.getLog(ResponseBodySubscriber.class); + + private final HttpServerExchange exchange; + private final Queue buffers; + private final AtomicInteger writing = new AtomicInteger(); + private final AtomicBoolean closing = new AtomicBoolean(); + private StreamSinkChannel responseChannel; + private Subscription subscription; + + public ResponseBodySubscriber(HttpServerExchange exchange) { + this.exchange = exchange; + this.buffers = new ConcurrentLinkedQueue<>(); + } + + @Override + public void onSubscribe(Subscription s) { + super.onSubscribe(s); + subscription = s; + subscription.request(1); + } + + @Override + public void onNext(ByteBuffer buffer) { + super.onNext(buffer); + + if (responseChannel == null) { + responseChannel = exchange.getResponseChannel(); + } + + writing.incrementAndGet(); + try { + int c; + do { + c = responseChannel.write(buffer); + } while (buffer.hasRemaining() && c > 0); + if (buffer.hasRemaining()) { + writing.incrementAndGet(); + enqueue(buffer); + responseChannel.getWriteSetter().set(this); + responseChannel.resumeWrites(); + } + else { + this.subscription.request(1); + } + + } + catch (IOException ex) { + onError(ex); + } + finally { + writing.decrementAndGet(); + if (closing.get()) { + closeIfDone(); + } + } + } + + private void enqueue(ByteBuffer src) { + do { + PooledByteBuffer pooledBuffer = + exchange.getConnection().getByteBufferPool().allocate(); + + ByteBuffer dst = pooledBuffer.getBuffer(); + copy(dst, src); + dst.flip(); + buffers.add(pooledBuffer); + } while (src.remaining() > 0); + } + + private void copy(ByteBuffer dst, ByteBuffer src) { + int n = Math.min(dst.capacity(), src.remaining()); + for (int i = 0; i < n; i++) { + dst.put(src.get()); + } + } + + @Override + public void handleEvent(StreamSinkChannel channel) { + try { + int c; + do { + ByteBuffer buffer = buffers.peek().getBuffer(); + do { + c = channel.write(buffer); + } while (buffer.hasRemaining() && c > 0); + if (!buffer.hasRemaining()) { + safeClose(buffers.remove()); + } + } while (!buffers.isEmpty() && c > 0); + if (!buffers.isEmpty()) { + channel.resumeWrites(); + } + else { + writing.decrementAndGet(); + + if (closing.get()) { + closeIfDone(); + } + else { + subscription.request(1); + } + } + } + catch (IOException ex) { + onError(ex); + } + } + + @Override + public void onError(Throwable t) { + super.onError(t); + if (!exchange.isResponseStarted() && + exchange.getStatusCode() < INTERNAL_SERVER_ERROR.value()) { + + exchange.setStatusCode(INTERNAL_SERVER_ERROR.value()); + } + logger.error("ResponseBodySubscriber error", t); + } + + @Override + public void onComplete() { + super.onComplete(); + + if (responseChannel != null) { + closing.set(true); + closeIfDone(); + } + } + + private void closeIfDone() { + if (writing.get() == 0) { + if (closing.compareAndSet(true, false)) { + closeChannel(); + } + } + } + + private void closeChannel() { + try { + responseChannel.shutdownWrites(); + + if (!responseChannel.flush()) { + responseChannel.getWriteSetter().set( + flushingChannelListener( + o -> safeClose(responseChannel), + closingChannelExceptionHandler())); + responseChannel.resumeWrites(); + } + responseChannel = null; + } + catch (IOException ex) { + onError(ex); + } + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowHttpServer.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowHttpServer.java new file mode 100644 index 0000000000..262ed92869 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowHttpServer.java @@ -0,0 +1,70 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.web.http.undertow; + +import org.springframework.beans.factory.InitializingBean; +import org.springframework.reactive.web.http.HttpServer; +import org.springframework.reactive.web.http.HttpServerSupport; +import org.springframework.util.Assert; + +import io.undertow.Undertow; +import io.undertow.server.HttpHandler; + +/** + * @author Marek Hawrylczak + */ +public class UndertowHttpServer extends HttpServerSupport + implements InitializingBean, HttpServer { + + private Undertow undertowServer; + + private boolean running; + + @Override + public void afterPropertiesSet() throws Exception { + Assert.notNull(getHttpHandler()); + + HttpHandler handler = new RequestHandlerAdapter(getHttpHandler()); + + undertowServer = Undertow.builder() + .addHttpListener(getPort() != -1 ? getPort() : 8080, "localhost") + .setHandler(handler) + .build(); + } + + @Override + public void start() { + if (!running) { + undertowServer.start(); + running = true; + } + + } + + @Override + public void stop() { + if (running) { + undertowServer.stop(); + running = false; + } + } + + @Override + public boolean isRunning() { + return running; + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpRequest.java new file mode 100644 index 0000000000..32b5fa2b99 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpRequest.java @@ -0,0 +1,86 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.web.http.undertow; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.server.ReactiveServerHttpRequest; +import org.springframework.util.StringUtils; + +import io.undertow.server.HttpServerExchange; +import io.undertow.util.HeaderValues; +import org.reactivestreams.Publisher; + +/** + * @author Marek Hawrylczak + */ +class UndertowServerHttpRequest implements ReactiveServerHttpRequest { + + private final HttpServerExchange exchange; + + private final Publisher requestBodyPublisher; + + private HttpHeaders headers; + + public UndertowServerHttpRequest(HttpServerExchange exchange, + Publisher requestBodyPublisher) { + + this.exchange = exchange; + this.requestBodyPublisher = requestBodyPublisher; + } + + @Override + public Publisher getBody() { + return this.requestBodyPublisher; + } + + @Override + public HttpMethod getMethod() { + return HttpMethod.valueOf(exchange.getRequestMethod().toString()); + } + + @Override + public URI getURI() { + try { + StringBuilder uri = new StringBuilder(exchange.getRequestPath()); + if (StringUtils.hasLength(exchange.getQueryString())) { + uri.append('?').append(exchange.getQueryString()); + } + return new URI(uri.toString()); + } + catch (URISyntaxException ex) { + throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex); + } + } + + @Override + public HttpHeaders getHeaders() { + if (this.headers == null) { + this.headers = new HttpHeaders(); + for (HeaderValues headerValues : exchange.getRequestHeaders()) { + for (String value : headerValues) { + this.headers.add(headerValues.getHeaderName().toString(), value); + } + } + } + return this.headers; + } +} diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpResponse.java new file mode 100644 index 0000000000..3c68dc41e5 --- /dev/null +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/undertow/UndertowServerHttpResponse.java @@ -0,0 +1,113 @@ +/* + * Copyright 2002-2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.reactive.web.http.undertow; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.server.ReactiveServerHttpResponse; + +import io.undertow.server.HttpServerExchange; +import io.undertow.util.HttpString; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import reactor.rx.Streams; + +/** + * @author Marek Hawrylczak + */ +class UndertowServerHttpResponse implements ReactiveServerHttpResponse { + private final HttpServerExchange exchange; + private final HttpHeaders headers; + + private final ResponseBodySubscriber responseBodySubscriber; + + private boolean headersWritten = false; + + public UndertowServerHttpResponse(HttpServerExchange exchange, + ResponseBodySubscriber responseBodySubscriber) { + + this.exchange = exchange; + this.responseBodySubscriber = responseBodySubscriber; + this.headers = new HttpHeaders(); + } + + @Override + public void setStatusCode(HttpStatus status) { + exchange.setStatusCode(status.value()); + } + + @Override + public Publisher setBody(Publisher contentPublisher) { + applyHeaders(); + return s -> s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + Streams.wrap(contentPublisher) + .finallyDo(byteBufferSignal -> { + if (byteBufferSignal.isOnComplete()) { + s.onComplete(); + } + else { + s.onError(byteBufferSignal.getThrowable()); + } + } + ).subscribe(responseBodySubscriber); + } + + @Override + public void cancel() { + } + }); + } + + @Override + public HttpHeaders getHeaders() { + return (this.headersWritten ? + HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers); + } + + @Override + public Publisher writeHeaders() { + applyHeaders(); + return s -> s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + s.onComplete(); + } + + @Override + public void cancel() { + } + }); + } + + private void applyHeaders() { + if (!this.headersWritten) { + for (Map.Entry> entry : this.headers.entrySet()) { + String headerName = entry.getKey(); + exchange.getResponseHeaders() + .addAll(HttpString.tryFromString(headerName), entry.getValue()); + + } + this.headersWritten = true; + } + } +} diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/AbstractHttpHandlerIntegrationTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/AbstractHttpHandlerIntegrationTests.java index e03b75180d..41136ac8b0 100644 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/AbstractHttpHandlerIntegrationTests.java +++ b/spring-web-reactive/src/test/java/org/springframework/reactive/web/http/AbstractHttpHandlerIntegrationTests.java @@ -25,6 +25,7 @@ import org.springframework.reactive.web.http.reactor.ReactorHttpServer; import org.springframework.reactive.web.http.rxnetty.RxNettyHttpServer; import org.springframework.reactive.web.http.servlet.JettyHttpServer; import org.springframework.reactive.web.http.servlet.TomcatHttpServer; +import org.springframework.reactive.web.http.undertow.UndertowHttpServer; import org.springframework.util.SocketUtils; @@ -43,7 +44,8 @@ public abstract class AbstractHttpHandlerIntegrationTests { {new JettyHttpServer()}, {new RxNettyHttpServer()}, {new ReactorHttpServer()}, - {new TomcatHttpServer()} + {new TomcatHttpServer()}, + {new UndertowHttpServer()} }; }