From f036f745a64a9c0143a2f8cd1ff4a9f40edff281 Mon Sep 17 00:00:00 2001 From: Arjen Poutsma Date: Thu, 18 Feb 2016 15:49:45 +0100 Subject: [PATCH] Improved RxNetty support. --- .../reactive/RxNettyHttpHandlerAdapter.java | 10 ++++---- .../reactive/RxNettyServerHttpResponse.java | 24 ++++++++++++------- .../reactive/boot/RxNettyHttpServer.java | 15 +----------- 3 files changed, 20 insertions(+), 29 deletions(-) diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyHttpHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyHttpHandlerAdapter.java index 6ed6f3e641..6023d34ff7 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyHttpHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyHttpHandlerAdapter.java @@ -34,18 +34,16 @@ public class RxNettyHttpHandlerAdapter implements RequestHandler handle(HttpServerRequest request, HttpServerResponse response) { + NettyDataBufferAllocator allocator = + new NettyDataBufferAllocator(response.unsafeNettyChannel().alloc()); + RxNettyServerHttpRequest adaptedRequest = new RxNettyServerHttpRequest(request, allocator); RxNettyServerHttpResponse adaptedResponse = new RxNettyServerHttpResponse(response); diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java index e1ad9ad646..ca064ad2b5 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java @@ -16,6 +16,8 @@ package org.springframework.http.server.reactive; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.DefaultCookie; @@ -26,6 +28,7 @@ import reactor.core.publisher.Mono; import rx.Observable; import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.NettyDataBuffer; import org.springframework.http.HttpCookie; import org.springframework.http.HttpStatus; import org.springframework.util.Assert; @@ -38,10 +41,9 @@ import org.springframework.util.Assert; */ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse { - private final HttpServerResponse response; + private final HttpServerResponse response; - - public RxNettyServerHttpResponse(HttpServerResponse response) { + public RxNettyServerHttpResponse(HttpServerResponse response) { Assert.notNull("'response', response must not be null."); this.response = response; } @@ -58,15 +60,19 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse { @Override protected Mono setBodyInternal(Publisher publisher) { - Observable content = RxJava1ObservableConverter.from(publisher).map(this::toBytes); - Observable completion = this.response.writeBytes(content); + Observable content = + RxJava1ObservableConverter.from(publisher).map(this::toByteBuf); + Observable completion = this.response.write(content); return RxJava1ObservableConverter.from(completion).after(); } - private byte[] toBytes(DataBuffer buffer) { - byte[] bytes = new byte[buffer.readableByteCount()]; - buffer.read(bytes); - return bytes; + private ByteBuf toByteBuf(DataBuffer buffer) { + if (buffer instanceof NettyDataBuffer) { + return ((NettyDataBuffer) buffer).getNativeBuffer(); + } + else { + return Unpooled.wrappedBuffer(buffer.asByteBuffer()); + } } @Override diff --git a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/RxNettyHttpServer.java b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/RxNettyHttpServer.java index bb23123281..d73fe0385f 100644 --- a/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/RxNettyHttpServer.java +++ b/spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/RxNettyHttpServer.java @@ -17,10 +17,7 @@ package org.springframework.http.server.reactive.boot; import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.UnpooledByteBufAllocator; -import org.springframework.core.io.buffer.NettyDataBufferAllocator; import org.springframework.http.server.reactive.RxNettyHttpHandlerAdapter; import org.springframework.util.Assert; @@ -34,22 +31,12 @@ public class RxNettyHttpServer extends HttpServerSupport implements HttpServer { private io.reactivex.netty.protocol.http.server.HttpServer rxNettyServer; - private NettyDataBufferAllocator allocator; - private boolean running; - public void setAllocator(ByteBufAllocator allocator) { - Assert.notNull(allocator, "'allocator' must not be null"); - this.allocator = new NettyDataBufferAllocator(allocator); - } - @Override public void afterPropertiesSet() throws Exception { Assert.notNull(getHttpHandler()); - if (allocator == null) { - allocator = new NettyDataBufferAllocator(UnpooledByteBufAllocator.DEFAULT); - } - this.rxNettyHandler = new RxNettyHttpHandlerAdapter(getHttpHandler(), allocator); + this.rxNettyHandler = new RxNettyHttpHandlerAdapter(getHttpHandler()); this.rxNettyServer = (getPort() != -1 ? io.reactivex.netty.protocol.http.server.HttpServer.newServer(getPort()) :