diff --git a/build.gradle b/build.gradle index d35a0dd1a0..84c51575e0 100644 --- a/build.gradle +++ b/build.gradle @@ -78,7 +78,7 @@ configure(allprojects) { project -> ext.reactivestreamsVersion = "1.0.0" ext.reactorVersion = "2.0.8.RELEASE" ext.reactorCoreVersion = '3.0.3.RELEASE' - ext.reactorNettyVersion = '0.5.2.RELEASE' + ext.reactorNettyVersion = '0.6.0.BUILD-SNAPSHOT' ext.romeVersion = "1.7.0" ext.rxjavaVersion = '1.2.2' ext.rxjavaAdapterVersion = '1.2.1' @@ -171,6 +171,7 @@ configure(allprojects) { project -> repositories { maven { url "https://repo.spring.io/libs-release" } maven { url "https://repo.spring.io/milestone" } + maven { url "https://repo.spring.io/snapshot" } // reactor-netty 0.6 snapshots } dependencies { diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 6ef252a5ee..8a38ab6e4b 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,4 +1,4 @@ -#Mon Aug 15 21:03:22 CEST 2016 +#Fri Nov 04 16:30:57 GMT 2016 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME diff --git a/spring-web-reactive/src/test/resources/log4j.properties b/spring-web-reactive/src/test/resources/log4j.properties deleted file mode 100644 index 34659ab78b..0000000000 --- a/spring-web-reactive/src/test/resources/log4j.properties +++ /dev/null @@ -1,9 +0,0 @@ -log4j.rootCategory=WARN, stdout - -log4j.logger.org.springframework.http=DEBUG -log4j.logger.org.springframework.web=DEBUG -log4j.logger.reactor=INFO - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] <%t> - %m%n \ No newline at end of file diff --git a/spring-web-reactive/src/test/resources/log4j2-test.xml b/spring-web-reactive/src/test/resources/log4j2-test.xml new file mode 100644 index 0000000000..31c3aa1bda --- /dev/null +++ b/spring-web-reactive/src/test/resources/log4j2-test.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java index 0d6e1e4d76..ffae45d92d 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java @@ -17,13 +17,14 @@ package org.springframework.http.client.reactive; import java.net.URI; +import java.util.function.Consumer; import java.util.function.Function; import reactor.core.publisher.Mono; -import reactor.ipc.netty.config.ClientOptions; -import reactor.ipc.netty.http.HttpClient; -import reactor.ipc.netty.http.HttpException; -import reactor.ipc.netty.http.HttpInbound; +import reactor.ipc.netty.http.client.HttpClientOptions; +import reactor.ipc.netty.options.ClientOptions; +import reactor.ipc.netty.http.client.HttpClient; +import reactor.ipc.netty.http.client.HttpClientException; import org.springframework.http.HttpMethod; @@ -44,13 +45,13 @@ public class ReactorClientHttpConnector implements ClientHttpConnector { * and SSL support enabled. */ public ReactorClientHttpConnector() { - this(ClientOptions.create().sslSupport()); + this.httpClient = HttpClient.create(); } /** * Create a Reactor Netty {@link ClientHttpConnector} with the given {@link ClientOptions} */ - public ReactorClientHttpConnector(ClientOptions clientOptions) { + public ReactorClientHttpConnector(Consumer clientOptions) { this.httpClient = HttpClient.create(clientOptions); } @@ -64,8 +65,7 @@ public class ReactorClientHttpConnector implements ClientHttpConnector { uri.toString(), httpClientRequest -> requestCallback .apply(new ReactorClientHttpRequest(method, uri, httpClientRequest))) - .cast(HttpInbound.class) - .otherwise(HttpException.class, exc -> Mono.just(exc.getChannel())) + .otherwise(HttpClientException.class, exc -> Mono.just(exc.getResponse())) .map(ReactorClientHttpResponse::new); } diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java index a2c673bc1b..f7f7c18d41 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java @@ -24,7 +24,7 @@ import io.netty.handler.codec.http.cookie.DefaultCookie; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.ipc.netty.http.HttpClientRequest; +import reactor.ipc.netty.http.client.HttpClientRequest; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; @@ -36,7 +36,7 @@ import org.springframework.http.HttpMethod; * * @author Brian Clozel * @since 5.0 - * @see reactor.ipc.netty.http.HttpClient + * @see reactor.ipc.netty.http.client.HttpClient */ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @@ -54,7 +54,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { this.httpMethod = httpMethod; this.uri = uri; this.httpRequest = httpRequest; - this.bufferFactory = new NettyDataBufferFactory(httpRequest.delegate().alloc()); + this.bufferFactory = new NettyDataBufferFactory(httpRequest.channel().alloc()); } @@ -84,7 +84,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { Publisher> byteBufs = Flux.from(body). map(ReactorClientHttpRequest::toByteBufs); return applyBeforeCommit().then(this.httpRequest - .sendAndFlush(byteBufs)); + .sendGroups(byteBufs)); } private static Publisher toByteBufs(Publisher dataBuffers) { @@ -100,7 +100,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @Override protected void writeHeaders() { getHeaders().entrySet() - .forEach(e -> this.httpRequest.headers().set(e.getKey(), e.getValue())); + .forEach(e -> this.httpRequest.requestHeaders().set(e.getKey(), e.getValue())); } @Override diff --git a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java index d0276a0df7..9c748e51a3 100644 --- a/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java @@ -19,7 +19,7 @@ package org.springframework.http.client.reactive; import java.util.Collection; import reactor.core.publisher.Flux; -import reactor.ipc.netty.http.HttpInbound; +import reactor.ipc.netty.http.client.HttpClientResponse; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; @@ -34,19 +34,19 @@ import org.springframework.util.MultiValueMap; * {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client. * * @author Brian Clozel - * @see reactor.ipc.netty.http.HttpClient + * @see reactor.ipc.netty.http.client.HttpClient * @since 5.0 */ public class ReactorClientHttpResponse implements ClientHttpResponse { private final NettyDataBufferFactory dataBufferFactory; - private final HttpInbound response; + private final HttpClientResponse response; - public ReactorClientHttpResponse(HttpInbound response) { + public ReactorClientHttpResponse(HttpClientResponse response) { this.response = response; - this.dataBufferFactory = new NettyDataBufferFactory(response.delegate().alloc()); + this.dataBufferFactory = new NettyDataBufferFactory(response.channel().alloc()); } @@ -62,7 +62,9 @@ public class ReactorClientHttpResponse implements ClientHttpResponse { @Override public HttpHeaders getHeaders() { HttpHeaders headers = new HttpHeaders(); - this.response.responseHeaders().entries().stream().forEach(e -> headers.add(e.getKey(), e.getValue())); + this.response.responseHeaders() + .entries() + .forEach(e -> headers.add(e.getKey(), e.getValue())); return headers; } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java index 4fb552adad..aa9f55328e 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java @@ -17,11 +17,12 @@ package org.springframework.http.server.reactive; import java.util.Map; -import java.util.function.Function; +import java.util.function.BiFunction; import io.netty.handler.codec.http.HttpResponseStatus; import reactor.core.publisher.Mono; -import reactor.ipc.netty.http.HttpChannel; +import reactor.ipc.netty.http.server.HttpServerRequest; +import reactor.ipc.netty.http.server.HttpServerResponse; import org.springframework.core.io.buffer.NettyDataBufferFactory; @@ -32,7 +33,7 @@ import org.springframework.core.io.buffer.NettyDataBufferFactory; * @since 5.0 */ public class ReactorHttpHandlerAdapter extends HttpHandlerAdapterSupport - implements Function> { + implements BiFunction> { public ReactorHttpHandlerAdapter(HttpHandler httpHandler) { @@ -45,16 +46,16 @@ public class ReactorHttpHandlerAdapter extends HttpHandlerAdapterSupport @Override - public Mono apply(HttpChannel channel) { + public Mono apply(HttpServerRequest request, HttpServerResponse response) { - NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(channel.delegate().alloc()); - ReactorServerHttpRequest request = new ReactorServerHttpRequest(channel, bufferFactory); - ReactorServerHttpResponse response = new ReactorServerHttpResponse(channel, bufferFactory); + NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(request.channel().alloc()); + ReactorServerHttpRequest req = new ReactorServerHttpRequest(request, bufferFactory); + ReactorServerHttpResponse resp = new ReactorServerHttpResponse(response, bufferFactory); - return getHttpHandler().handle(request, response) + return getHttpHandler().handle(req, resp) .otherwise(ex -> { logger.error("Could not complete request", ex); - channel.status(HttpResponseStatus.INTERNAL_SERVER_ERROR); + response.status(HttpResponseStatus.INTERNAL_SERVER_ERROR); return Mono.empty(); }) .doOnSuccess(aVoid -> logger.debug("Successfully completed request")); diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java index eb1299f4fa..eba9e34948 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java @@ -22,7 +22,7 @@ import java.net.URISyntaxException; import io.netty.handler.codec.http.cookie.Cookie; import reactor.core.publisher.Flux; -import reactor.ipc.netty.http.HttpChannel; +import reactor.ipc.netty.http.server.HttpServerRequest; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.NettyDataBufferFactory; @@ -34,7 +34,7 @@ import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; /** - * Adapt {@link ServerHttpRequest} to the Reactor Net {@link HttpChannel}. + * Adapt {@link ServerHttpRequest} to the Reactor {@link HttpServerRequest}. * * @author Stephane Maldini * @author Rossen Stoyanchev @@ -42,19 +42,19 @@ import org.springframework.util.MultiValueMap; */ public class ReactorServerHttpRequest extends AbstractServerHttpRequest { - private final HttpChannel channel; + private final HttpServerRequest request; private final NettyDataBufferFactory bufferFactory; - public ReactorServerHttpRequest(HttpChannel channel, NettyDataBufferFactory bufferFactory) { - super(initUri(channel), initHeaders(channel)); + public ReactorServerHttpRequest(HttpServerRequest request, NettyDataBufferFactory bufferFactory) { + super(initUri(request), initHeaders(request)); Assert.notNull(bufferFactory, "'bufferFactory' must not be null"); - this.channel = channel; + this.request = request; this.bufferFactory = bufferFactory; } - private static URI initUri(HttpChannel channel) { + private static URI initUri(HttpServerRequest channel) { Assert.notNull("'channel' must not be null"); try { URI uri = new URI(channel.uri()); @@ -73,29 +73,29 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest { } } - private static HttpHeaders initHeaders(HttpChannel channel) { + private static HttpHeaders initHeaders(HttpServerRequest channel) { HttpHeaders headers = new HttpHeaders(); - for (String name : channel.headers().names()) { - headers.put(name, channel.headers().getAll(name)); + for (String name : channel.requestHeaders().names()) { + headers.put(name, channel.requestHeaders().getAll(name)); } return headers; } - public HttpChannel getReactorChannel() { - return this.channel; + public HttpServerRequest getReactorRequest() { + return this.request; } @Override public HttpMethod getMethod() { - return HttpMethod.valueOf(this.channel.method().name()); + return HttpMethod.valueOf(this.request.method().name()); } @Override protected MultiValueMap initCookies() { MultiValueMap cookies = new LinkedMultiValueMap<>(); - for (CharSequence name : this.channel.cookies().keySet()) { - for (Cookie cookie : this.channel.cookies().get(name)) { + for (CharSequence name : this.request.cookies().keySet()) { + for (Cookie cookie : this.request.cookies().get(name)) { HttpCookie httpCookie = new HttpCookie(name.toString(), cookie.value()); cookies.add(name.toString(), httpCookie); } @@ -105,7 +105,7 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest { @Override public Flux getBody() { - return this.channel.receive().retain().map(this.bufferFactory::wrap); + return this.request.receive().retain().map(this.bufferFactory::wrap); } } diff --git a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java index 45dfc7f1e6..99e514f28b 100644 --- a/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java +++ b/spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java @@ -25,7 +25,7 @@ import io.netty.handler.codec.http.cookie.DefaultCookie; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.ipc.netty.http.HttpChannel; +import reactor.ipc.netty.http.server.HttpServerResponse; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; @@ -37,7 +37,7 @@ import org.springframework.http.ZeroCopyHttpOutputMessage; import org.springframework.util.Assert; /** - * Adapt {@link ServerHttpResponse} to the Reactor Net {@link HttpChannel}. + * Adapt {@link ServerHttpResponse} to the {@link HttpServerResponse}. * * @author Stephane Maldini * @author Rossen Stoyanchev @@ -46,18 +46,18 @@ import org.springframework.util.Assert; public class ReactorServerHttpResponse extends AbstractServerHttpResponse implements ZeroCopyHttpOutputMessage { - private final HttpChannel channel; + private final HttpServerResponse response; - public ReactorServerHttpResponse(HttpChannel response, DataBufferFactory bufferFactory) { + public ReactorServerHttpResponse(HttpServerResponse response, DataBufferFactory bufferFactory) { super(bufferFactory); Assert.notNull("'response' must not be null."); - this.channel = response; + this.response = response; } - public HttpChannel getReactorChannel() { - return this.channel; + public HttpServerResponse getReactorResponse() { + return this.response; } @@ -65,32 +65,32 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse protected void applyStatusCode() { HttpStatus statusCode = this.getStatusCode(); if (statusCode != null) { - getReactorChannel().status(HttpResponseStatus.valueOf(statusCode.value())); + getReactorResponse().status(HttpResponseStatus.valueOf(statusCode.value())); } } @Override protected Mono writeWithInternal(Publisher publisher) { Publisher body = toByteBufs(publisher); - return this.channel.send(body); + return this.response.send(body); } @Override protected Mono writeAndFlushWithInternal(Publisher> publisher) { Publisher> body = Flux.from(publisher) .map(ReactorServerHttpResponse::toByteBufs); - return this.channel.sendAndFlush(body); + return this.response.sendGroups(body); } @Override protected void applyHeaders() { // TODO: temporarily, see https://github.com/reactor/reactor-netty/issues/2 if(getHeaders().containsKey(HttpHeaders.CONTENT_LENGTH)){ - this.channel.responseTransfer(false); + this.response.disableChunkedTransfer(); } for (String name : getHeaders().keySet()) { for (String value : getHeaders().get(name)) { - this.channel.responseHeaders().add(name, value); + this.response.responseHeaders().add(name, value); } } } @@ -107,14 +107,14 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse httpCookie.getPath().ifPresent(cookie::setPath); cookie.setSecure(httpCookie.isSecure()); cookie.setHttpOnly(httpCookie.isHttpOnly()); - this.channel.addResponseCookie(cookie); + this.response.addCookie(cookie); } } } @Override public Mono writeWith(File file, long position, long count) { - return doCommit(() -> this.channel.sendFile(file, position, count)); + return doCommit(() -> this.response.sendFile(file, position, count)); } private static Publisher toByteBufs(Publisher dataBuffers) { diff --git a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java index 3c30f688d6..5903996c81 100644 --- a/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java +++ b/spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java @@ -17,6 +17,7 @@ package org.springframework.http.server.reactive.bootstrap; import reactor.core.Loopback; +import reactor.ipc.netty.NettyContext; import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; import org.springframework.util.Assert; @@ -28,9 +29,9 @@ public class ReactorHttpServer extends HttpServerSupport implements HttpServer, private ReactorHttpHandlerAdapter reactorHandler; - private reactor.ipc.netty.http.HttpServer reactorServer; + private reactor.ipc.netty.http.server.HttpServer reactorServer; - private boolean running; + private NettyContext running; @Override @@ -42,13 +43,16 @@ public class ReactorHttpServer extends HttpServerSupport implements HttpServer, Assert.notNull(getHttpHandler()); this.reactorHandler = new ReactorHttpHandlerAdapter(getHttpHandler()); } - this.reactorServer = reactor.ipc.netty.http.HttpServer.create(getHost(), getPort()); + this.reactorServer = reactor.ipc.netty.http.server.HttpServer.create(getHost(), + getPort()); } @Override public boolean isRunning() { - return this.running; + NettyContext running = this.running; + return running != null && running.channel() + .isActive(); } @Override @@ -63,22 +67,18 @@ public class ReactorHttpServer extends HttpServerSupport implements HttpServer, @Override public void start() { - if (!this.running) { - try { - this.reactorServer.startAndAwait(reactorHandler); - this.running = true; - } - catch (InterruptedException ex) { - throw new IllegalStateException(ex); - } + // TODO: should be made thread-safe (compareAndSet..) + if (this.running == null) { + this.running = this.reactorServer.newHandler(reactorHandler).block(); } } @Override public void stop() { - if (this.running) { - this.reactorServer.shutdown(); - this.running = false; + NettyContext running = this.running; + if (running != null) { + this.running = null; + running.dispose(); } } }