Browse Source

Improved RxNetty support.

pull/1111/head
Arjen Poutsma 9 years ago
parent
commit
f036f745a6
  1. 10
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyHttpHandlerAdapter.java
  2. 24
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java
  3. 15
      spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/RxNettyHttpServer.java

10
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyHttpHandlerAdapter.java

@ -34,18 +34,16 @@ public class RxNettyHttpHandlerAdapter implements RequestHandler<ByteBuf, ByteBu
private final HttpHandler httpHandler; private final HttpHandler httpHandler;
private final NettyDataBufferAllocator allocator; public RxNettyHttpHandlerAdapter(HttpHandler httpHandler) {
public RxNettyHttpHandlerAdapter(HttpHandler httpHandler,
NettyDataBufferAllocator allocator) {
Assert.notNull(httpHandler, "'httpHandler' is required"); Assert.notNull(httpHandler, "'httpHandler' is required");
Assert.notNull(allocator, "'allocator' must not be null");
this.httpHandler = httpHandler; this.httpHandler = httpHandler;
this.allocator = allocator;
} }
@Override @Override
public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) { public Observable<Void> handle(HttpServerRequest<ByteBuf> request, HttpServerResponse<ByteBuf> response) {
NettyDataBufferAllocator allocator =
new NettyDataBufferAllocator(response.unsafeNettyChannel().alloc());
RxNettyServerHttpRequest adaptedRequest = RxNettyServerHttpRequest adaptedRequest =
new RxNettyServerHttpRequest(request, allocator); new RxNettyServerHttpRequest(request, allocator);
RxNettyServerHttpResponse adaptedResponse = new RxNettyServerHttpResponse(response); RxNettyServerHttpResponse adaptedResponse = new RxNettyServerHttpResponse(response);

24
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/RxNettyServerHttpResponse.java

@ -16,6 +16,8 @@
package org.springframework.http.server.reactive; package org.springframework.http.server.reactive;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.DefaultCookie; import io.netty.handler.codec.http.cookie.DefaultCookie;
@ -26,6 +28,7 @@ import reactor.core.publisher.Mono;
import rx.Observable; import rx.Observable;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.NettyDataBuffer;
import org.springframework.http.HttpCookie; import org.springframework.http.HttpCookie;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -38,10 +41,9 @@ import org.springframework.util.Assert;
*/ */
public class RxNettyServerHttpResponse extends AbstractServerHttpResponse { public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
private final HttpServerResponse<?> response; private final HttpServerResponse<ByteBuf> response;
public RxNettyServerHttpResponse(HttpServerResponse<ByteBuf> response) {
public RxNettyServerHttpResponse(HttpServerResponse<?> response) {
Assert.notNull("'response', response must not be null."); Assert.notNull("'response', response must not be null.");
this.response = response; this.response = response;
} }
@ -58,15 +60,19 @@ public class RxNettyServerHttpResponse extends AbstractServerHttpResponse {
@Override @Override
protected Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher) { protected Mono<Void> setBodyInternal(Publisher<DataBuffer> publisher) {
Observable<byte[]> content = RxJava1ObservableConverter.from(publisher).map(this::toBytes); Observable<ByteBuf> content =
Observable<Void> completion = this.response.writeBytes(content); RxJava1ObservableConverter.from(publisher).map(this::toByteBuf);
Observable<Void> completion = this.response.write(content);
return RxJava1ObservableConverter.from(completion).after(); return RxJava1ObservableConverter.from(completion).after();
} }
private byte[] toBytes(DataBuffer buffer) { private ByteBuf toByteBuf(DataBuffer buffer) {
byte[] bytes = new byte[buffer.readableByteCount()]; if (buffer instanceof NettyDataBuffer) {
buffer.read(bytes); return ((NettyDataBuffer) buffer).getNativeBuffer();
return bytes; }
else {
return Unpooled.wrappedBuffer(buffer.asByteBuffer());
}
} }
@Override @Override

15
spring-web-reactive/src/main/java/org/springframework/http/server/reactive/boot/RxNettyHttpServer.java

@ -17,10 +17,7 @@
package org.springframework.http.server.reactive.boot; package org.springframework.http.server.reactive.boot;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.springframework.core.io.buffer.NettyDataBufferAllocator;
import org.springframework.http.server.reactive.RxNettyHttpHandlerAdapter; import org.springframework.http.server.reactive.RxNettyHttpHandlerAdapter;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -34,22 +31,12 @@ public class RxNettyHttpServer extends HttpServerSupport implements HttpServer {
private io.reactivex.netty.protocol.http.server.HttpServer<ByteBuf, ByteBuf> rxNettyServer; private io.reactivex.netty.protocol.http.server.HttpServer<ByteBuf, ByteBuf> rxNettyServer;
private NettyDataBufferAllocator allocator;
private boolean running; private boolean running;
public void setAllocator(ByteBufAllocator allocator) {
Assert.notNull(allocator, "'allocator' must not be null");
this.allocator = new NettyDataBufferAllocator(allocator);
}
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
Assert.notNull(getHttpHandler()); Assert.notNull(getHttpHandler());
if (allocator == null) { this.rxNettyHandler = new RxNettyHttpHandlerAdapter(getHttpHandler());
allocator = new NettyDataBufferAllocator(UnpooledByteBufAllocator.DEFAULT);
}
this.rxNettyHandler = new RxNettyHttpHandlerAdapter(getHttpHandler(), allocator);
this.rxNettyServer = (getPort() != -1 ? this.rxNettyServer = (getPort() != -1 ?
io.reactivex.netty.protocol.http.server.HttpServer.newServer(getPort()) : io.reactivex.netty.protocol.http.server.HttpServer.newServer(getPort()) :

Loading…
Cancel
Save