Browse Source

Merge pull request #1239 from smaldini/reactor-netty-0-6

pull/1243/head
Rossen Stoyanchev 8 years ago
parent
commit
82ad86c9d3
  1. 3
      build.gradle
  2. 2
      gradle/wrapper/gradle-wrapper.properties
  3. 9
      spring-web-reactive/src/test/resources/log4j.properties
  4. 16
      spring-web-reactive/src/test/resources/log4j2-test.xml
  5. 16
      spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java
  6. 10
      spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java
  7. 14
      spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java
  8. 19
      spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java
  9. 32
      spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java
  10. 28
      spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java
  11. 30
      spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java

3
build.gradle

@ -78,7 +78,7 @@ configure(allprojects) { project -> @@ -78,7 +78,7 @@ configure(allprojects) { project ->
ext.reactivestreamsVersion = "1.0.0"
ext.reactorVersion = "2.0.8.RELEASE"
ext.reactorCoreVersion = '3.0.3.RELEASE'
ext.reactorNettyVersion = '0.5.2.RELEASE'
ext.reactorNettyVersion = '0.6.0.BUILD-SNAPSHOT'
ext.romeVersion = "1.7.0"
ext.rxjavaVersion = '1.2.2'
ext.rxjavaAdapterVersion = '1.2.1'
@ -171,6 +171,7 @@ configure(allprojects) { project -> @@ -171,6 +171,7 @@ configure(allprojects) { project ->
repositories {
maven { url "https://repo.spring.io/libs-release" }
maven { url "https://repo.spring.io/milestone" }
maven { url "https://repo.spring.io/snapshot" } // reactor-netty 0.6 snapshots
}
dependencies {

2
gradle/wrapper/gradle-wrapper.properties vendored

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
#Mon Aug 15 21:03:22 CEST 2016
#Fri Nov 04 16:30:57 GMT 2016
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME

9
spring-web-reactive/src/test/resources/log4j.properties

@ -1,9 +0,0 @@ @@ -1,9 +0,0 @@
log4j.rootCategory=WARN, stdout
log4j.logger.org.springframework.http=DEBUG
log4j.logger.org.springframework.web=DEBUG
log4j.logger.reactor=INFO
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] <%t> - %m%n

16
spring-web-reactive/src/test/resources/log4j2-test.xml

@ -0,0 +1,16 @@ @@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" />
</Console>
</Appenders>
<Loggers>
<Logger name="org.springframework.http" level="debug" />
<Logger name="org.springframework.web" level="debug" />
<Logger name="reactor" level="debug" />
<Root level="error">
<AppenderRef ref="Console" />
</Root>
</Loggers>
</Configuration>

16
spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java

@ -17,13 +17,14 @@ @@ -17,13 +17,14 @@
package org.springframework.http.client.reactive;
import java.net.URI;
import java.util.function.Consumer;
import java.util.function.Function;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.config.ClientOptions;
import reactor.ipc.netty.http.HttpClient;
import reactor.ipc.netty.http.HttpException;
import reactor.ipc.netty.http.HttpInbound;
import reactor.ipc.netty.http.client.HttpClientOptions;
import reactor.ipc.netty.options.ClientOptions;
import reactor.ipc.netty.http.client.HttpClient;
import reactor.ipc.netty.http.client.HttpClientException;
import org.springframework.http.HttpMethod;
@ -44,13 +45,13 @@ public class ReactorClientHttpConnector implements ClientHttpConnector { @@ -44,13 +45,13 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
* and SSL support enabled.
*/
public ReactorClientHttpConnector() {
this(ClientOptions.create().sslSupport());
this.httpClient = HttpClient.create();
}
/**
* Create a Reactor Netty {@link ClientHttpConnector} with the given {@link ClientOptions}
*/
public ReactorClientHttpConnector(ClientOptions clientOptions) {
public ReactorClientHttpConnector(Consumer<? super HttpClientOptions> clientOptions) {
this.httpClient = HttpClient.create(clientOptions);
}
@ -64,8 +65,7 @@ public class ReactorClientHttpConnector implements ClientHttpConnector { @@ -64,8 +65,7 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
uri.toString(),
httpClientRequest -> requestCallback
.apply(new ReactorClientHttpRequest(method, uri, httpClientRequest)))
.cast(HttpInbound.class)
.otherwise(HttpException.class, exc -> Mono.just(exc.getChannel()))
.otherwise(HttpClientException.class, exc -> Mono.just(exc.getResponse()))
.map(ReactorClientHttpResponse::new);
}

10
spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java

@ -24,7 +24,7 @@ import io.netty.handler.codec.http.cookie.DefaultCookie; @@ -24,7 +24,7 @@ import io.netty.handler.codec.http.cookie.DefaultCookie;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.HttpClientRequest;
import reactor.ipc.netty.http.client.HttpClientRequest;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
@ -36,7 +36,7 @@ import org.springframework.http.HttpMethod; @@ -36,7 +36,7 @@ import org.springframework.http.HttpMethod;
*
* @author Brian Clozel
* @since 5.0
* @see reactor.ipc.netty.http.HttpClient
* @see reactor.ipc.netty.http.client.HttpClient
*/
public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
@ -54,7 +54,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @@ -54,7 +54,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
this.httpMethod = httpMethod;
this.uri = uri;
this.httpRequest = httpRequest;
this.bufferFactory = new NettyDataBufferFactory(httpRequest.delegate().alloc());
this.bufferFactory = new NettyDataBufferFactory(httpRequest.channel().alloc());
}
@ -84,7 +84,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @@ -84,7 +84,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
Publisher<Publisher<ByteBuf>> byteBufs = Flux.from(body).
map(ReactorClientHttpRequest::toByteBufs);
return applyBeforeCommit().then(this.httpRequest
.sendAndFlush(byteBufs));
.sendGroups(byteBufs));
}
private static Publisher<ByteBuf> toByteBufs(Publisher<DataBuffer> dataBuffers) {
@ -100,7 +100,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest { @@ -100,7 +100,7 @@ public class ReactorClientHttpRequest extends AbstractClientHttpRequest {
@Override
protected void writeHeaders() {
getHeaders().entrySet()
.forEach(e -> this.httpRequest.headers().set(e.getKey(), e.getValue()));
.forEach(e -> this.httpRequest.requestHeaders().set(e.getKey(), e.getValue()));
}
@Override

14
spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java

@ -19,7 +19,7 @@ package org.springframework.http.client.reactive; @@ -19,7 +19,7 @@ package org.springframework.http.client.reactive;
import java.util.Collection;
import reactor.core.publisher.Flux;
import reactor.ipc.netty.http.HttpInbound;
import reactor.ipc.netty.http.client.HttpClientResponse;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
@ -34,19 +34,19 @@ import org.springframework.util.MultiValueMap; @@ -34,19 +34,19 @@ import org.springframework.util.MultiValueMap;
* {@link ClientHttpResponse} implementation for the Reactor-Netty HTTP client.
*
* @author Brian Clozel
* @see reactor.ipc.netty.http.HttpClient
* @see reactor.ipc.netty.http.client.HttpClient
* @since 5.0
*/
public class ReactorClientHttpResponse implements ClientHttpResponse {
private final NettyDataBufferFactory dataBufferFactory;
private final HttpInbound response;
private final HttpClientResponse response;
public ReactorClientHttpResponse(HttpInbound response) {
public ReactorClientHttpResponse(HttpClientResponse response) {
this.response = response;
this.dataBufferFactory = new NettyDataBufferFactory(response.delegate().alloc());
this.dataBufferFactory = new NettyDataBufferFactory(response.channel().alloc());
}
@ -62,7 +62,9 @@ public class ReactorClientHttpResponse implements ClientHttpResponse { @@ -62,7 +62,9 @@ public class ReactorClientHttpResponse implements ClientHttpResponse {
@Override
public HttpHeaders getHeaders() {
HttpHeaders headers = new HttpHeaders();
this.response.responseHeaders().entries().stream().forEach(e -> headers.add(e.getKey(), e.getValue()));
this.response.responseHeaders()
.entries()
.forEach(e -> headers.add(e.getKey(), e.getValue()));
return headers;
}

19
spring-web/src/main/java/org/springframework/http/server/reactive/ReactorHttpHandlerAdapter.java

@ -17,11 +17,12 @@ @@ -17,11 +17,12 @@
package org.springframework.http.server.reactive;
import java.util.Map;
import java.util.function.Function;
import java.util.function.BiFunction;
import io.netty.handler.codec.http.HttpResponseStatus;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.HttpChannel;
import reactor.ipc.netty.http.server.HttpServerRequest;
import reactor.ipc.netty.http.server.HttpServerResponse;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
@ -32,7 +33,7 @@ import org.springframework.core.io.buffer.NettyDataBufferFactory; @@ -32,7 +33,7 @@ import org.springframework.core.io.buffer.NettyDataBufferFactory;
* @since 5.0
*/
public class ReactorHttpHandlerAdapter extends HttpHandlerAdapterSupport
implements Function<HttpChannel, Mono<Void>> {
implements BiFunction<HttpServerRequest, HttpServerResponse, Mono<Void>> {
public ReactorHttpHandlerAdapter(HttpHandler httpHandler) {
@ -45,16 +46,16 @@ public class ReactorHttpHandlerAdapter extends HttpHandlerAdapterSupport @@ -45,16 +46,16 @@ public class ReactorHttpHandlerAdapter extends HttpHandlerAdapterSupport
@Override
public Mono<Void> apply(HttpChannel channel) {
public Mono<Void> apply(HttpServerRequest request, HttpServerResponse response) {
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(channel.delegate().alloc());
ReactorServerHttpRequest request = new ReactorServerHttpRequest(channel, bufferFactory);
ReactorServerHttpResponse response = new ReactorServerHttpResponse(channel, bufferFactory);
NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(request.channel().alloc());
ReactorServerHttpRequest req = new ReactorServerHttpRequest(request, bufferFactory);
ReactorServerHttpResponse resp = new ReactorServerHttpResponse(response, bufferFactory);
return getHttpHandler().handle(request, response)
return getHttpHandler().handle(req, resp)
.otherwise(ex -> {
logger.error("Could not complete request", ex);
channel.status(HttpResponseStatus.INTERNAL_SERVER_ERROR);
response.status(HttpResponseStatus.INTERNAL_SERVER_ERROR);
return Mono.empty();
})
.doOnSuccess(aVoid -> logger.debug("Successfully completed request"));

32
spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpRequest.java

@ -22,7 +22,7 @@ import java.net.URISyntaxException; @@ -22,7 +22,7 @@ import java.net.URISyntaxException;
import io.netty.handler.codec.http.cookie.Cookie;
import reactor.core.publisher.Flux;
import reactor.ipc.netty.http.HttpChannel;
import reactor.ipc.netty.http.server.HttpServerRequest;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
@ -34,7 +34,7 @@ import org.springframework.util.LinkedMultiValueMap; @@ -34,7 +34,7 @@ import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
/**
* Adapt {@link ServerHttpRequest} to the Reactor Net {@link HttpChannel}.
* Adapt {@link ServerHttpRequest} to the Reactor {@link HttpServerRequest}.
*
* @author Stephane Maldini
* @author Rossen Stoyanchev
@ -42,19 +42,19 @@ import org.springframework.util.MultiValueMap; @@ -42,19 +42,19 @@ import org.springframework.util.MultiValueMap;
*/
public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
private final HttpChannel channel;
private final HttpServerRequest request;
private final NettyDataBufferFactory bufferFactory;
public ReactorServerHttpRequest(HttpChannel channel, NettyDataBufferFactory bufferFactory) {
super(initUri(channel), initHeaders(channel));
public ReactorServerHttpRequest(HttpServerRequest request, NettyDataBufferFactory bufferFactory) {
super(initUri(request), initHeaders(request));
Assert.notNull(bufferFactory, "'bufferFactory' must not be null");
this.channel = channel;
this.request = request;
this.bufferFactory = bufferFactory;
}
private static URI initUri(HttpChannel channel) {
private static URI initUri(HttpServerRequest channel) {
Assert.notNull("'channel' must not be null");
try {
URI uri = new URI(channel.uri());
@ -73,29 +73,29 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest { @@ -73,29 +73,29 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
}
}
private static HttpHeaders initHeaders(HttpChannel channel) {
private static HttpHeaders initHeaders(HttpServerRequest channel) {
HttpHeaders headers = new HttpHeaders();
for (String name : channel.headers().names()) {
headers.put(name, channel.headers().getAll(name));
for (String name : channel.requestHeaders().names()) {
headers.put(name, channel.requestHeaders().getAll(name));
}
return headers;
}
public HttpChannel getReactorChannel() {
return this.channel;
public HttpServerRequest getReactorRequest() {
return this.request;
}
@Override
public HttpMethod getMethod() {
return HttpMethod.valueOf(this.channel.method().name());
return HttpMethod.valueOf(this.request.method().name());
}
@Override
protected MultiValueMap<String, HttpCookie> initCookies() {
MultiValueMap<String, HttpCookie> cookies = new LinkedMultiValueMap<>();
for (CharSequence name : this.channel.cookies().keySet()) {
for (Cookie cookie : this.channel.cookies().get(name)) {
for (CharSequence name : this.request.cookies().keySet()) {
for (Cookie cookie : this.request.cookies().get(name)) {
HttpCookie httpCookie = new HttpCookie(name.toString(), cookie.value());
cookies.add(name.toString(), httpCookie);
}
@ -105,7 +105,7 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest { @@ -105,7 +105,7 @@ public class ReactorServerHttpRequest extends AbstractServerHttpRequest {
@Override
public Flux<DataBuffer> getBody() {
return this.channel.receive().retain().map(this.bufferFactory::wrap);
return this.request.receive().retain().map(this.bufferFactory::wrap);
}
}

28
spring-web/src/main/java/org/springframework/http/server/reactive/ReactorServerHttpResponse.java

@ -25,7 +25,7 @@ import io.netty.handler.codec.http.cookie.DefaultCookie; @@ -25,7 +25,7 @@ import io.netty.handler.codec.http.cookie.DefaultCookie;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.ipc.netty.http.HttpChannel;
import reactor.ipc.netty.http.server.HttpServerResponse;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
@ -37,7 +37,7 @@ import org.springframework.http.ZeroCopyHttpOutputMessage; @@ -37,7 +37,7 @@ import org.springframework.http.ZeroCopyHttpOutputMessage;
import org.springframework.util.Assert;
/**
* Adapt {@link ServerHttpResponse} to the Reactor Net {@link HttpChannel}.
* Adapt {@link ServerHttpResponse} to the {@link HttpServerResponse}.
*
* @author Stephane Maldini
* @author Rossen Stoyanchev
@ -46,18 +46,18 @@ import org.springframework.util.Assert; @@ -46,18 +46,18 @@ import org.springframework.util.Assert;
public class ReactorServerHttpResponse extends AbstractServerHttpResponse
implements ZeroCopyHttpOutputMessage {
private final HttpChannel channel;
private final HttpServerResponse response;
public ReactorServerHttpResponse(HttpChannel response, DataBufferFactory bufferFactory) {
public ReactorServerHttpResponse(HttpServerResponse response, DataBufferFactory bufferFactory) {
super(bufferFactory);
Assert.notNull("'response' must not be null.");
this.channel = response;
this.response = response;
}
public HttpChannel getReactorChannel() {
return this.channel;
public HttpServerResponse getReactorResponse() {
return this.response;
}
@ -65,32 +65,32 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse @@ -65,32 +65,32 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse
protected void applyStatusCode() {
HttpStatus statusCode = this.getStatusCode();
if (statusCode != null) {
getReactorChannel().status(HttpResponseStatus.valueOf(statusCode.value()));
getReactorResponse().status(HttpResponseStatus.valueOf(statusCode.value()));
}
}
@Override
protected Mono<Void> writeWithInternal(Publisher<DataBuffer> publisher) {
Publisher<ByteBuf> body = toByteBufs(publisher);
return this.channel.send(body);
return this.response.send(body);
}
@Override
protected Mono<Void> writeAndFlushWithInternal(Publisher<Publisher<DataBuffer>> publisher) {
Publisher<Publisher<ByteBuf>> body = Flux.from(publisher)
.map(ReactorServerHttpResponse::toByteBufs);
return this.channel.sendAndFlush(body);
return this.response.sendGroups(body);
}
@Override
protected void applyHeaders() {
// TODO: temporarily, see https://github.com/reactor/reactor-netty/issues/2
if(getHeaders().containsKey(HttpHeaders.CONTENT_LENGTH)){
this.channel.responseTransfer(false);
this.response.disableChunkedTransfer();
}
for (String name : getHeaders().keySet()) {
for (String value : getHeaders().get(name)) {
this.channel.responseHeaders().add(name, value);
this.response.responseHeaders().add(name, value);
}
}
}
@ -107,14 +107,14 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse @@ -107,14 +107,14 @@ public class ReactorServerHttpResponse extends AbstractServerHttpResponse
httpCookie.getPath().ifPresent(cookie::setPath);
cookie.setSecure(httpCookie.isSecure());
cookie.setHttpOnly(httpCookie.isHttpOnly());
this.channel.addResponseCookie(cookie);
this.response.addCookie(cookie);
}
}
}
@Override
public Mono<Void> writeWith(File file, long position, long count) {
return doCommit(() -> this.channel.sendFile(file, position, count));
return doCommit(() -> this.response.sendFile(file, position, count));
}
private static Publisher<ByteBuf> toByteBufs(Publisher<DataBuffer> dataBuffers) {

30
spring-web/src/test/java/org/springframework/http/server/reactive/bootstrap/ReactorHttpServer.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.springframework.http.server.reactive.bootstrap;
import reactor.core.Loopback;
import reactor.ipc.netty.NettyContext;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.util.Assert;
@ -28,9 +29,9 @@ public class ReactorHttpServer extends HttpServerSupport implements HttpServer, @@ -28,9 +29,9 @@ public class ReactorHttpServer extends HttpServerSupport implements HttpServer,
private ReactorHttpHandlerAdapter reactorHandler;
private reactor.ipc.netty.http.HttpServer reactorServer;
private reactor.ipc.netty.http.server.HttpServer reactorServer;
private boolean running;
private NettyContext running;
@Override
@ -42,13 +43,16 @@ public class ReactorHttpServer extends HttpServerSupport implements HttpServer, @@ -42,13 +43,16 @@ public class ReactorHttpServer extends HttpServerSupport implements HttpServer,
Assert.notNull(getHttpHandler());
this.reactorHandler = new ReactorHttpHandlerAdapter(getHttpHandler());
}
this.reactorServer = reactor.ipc.netty.http.HttpServer.create(getHost(), getPort());
this.reactorServer = reactor.ipc.netty.http.server.HttpServer.create(getHost(),
getPort());
}
@Override
public boolean isRunning() {
return this.running;
NettyContext running = this.running;
return running != null && running.channel()
.isActive();
}
@Override
@ -63,22 +67,18 @@ public class ReactorHttpServer extends HttpServerSupport implements HttpServer, @@ -63,22 +67,18 @@ public class ReactorHttpServer extends HttpServerSupport implements HttpServer,
@Override
public void start() {
if (!this.running) {
try {
this.reactorServer.startAndAwait(reactorHandler);
this.running = true;
}
catch (InterruptedException ex) {
throw new IllegalStateException(ex);
}
// TODO: should be made thread-safe (compareAndSet..)
if (this.running == null) {
this.running = this.reactorServer.newHandler(reactorHandler).block();
}
}
@Override
public void stop() {
if (this.running) {
this.reactorServer.shutdown();
this.running = false;
NettyContext running = this.running;
if (running != null) {
this.running = null;
running.dispose();
}
}
}

Loading…
Cancel
Save