Browse Source

Update to latest reactor-net

Latest reactor-net doesn't depend on reactor-stream anymore (neither reactor-codec and reactor-bus, it only depends on reactor-core).
pull/1111/head
Stephane Maldini 9 years ago committed by Sebastien Deleuze
parent
commit
fd52ae999b
  1. 1
      spring-web-reactive/build.gradle
  2. 81
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/PublisherReactorServerHttpRequest.java
  3. 85
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/PublisherReactorServerHttpResponse.java
  4. 10
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/ReactorHttpServer.java
  5. 52
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/ReactorServerHttpRequest.java
  6. 47
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/ReactorServerHttpResponse.java
  7. 24
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/RequestHandlerAdapter.java
  8. 6
      spring-web-reactive/src/test/java/org/springframework/reactive/web/http/AbstractHttpHandlerIntegrationTests.java

1
spring-web-reactive/build.gradle

@ -41,6 +41,7 @@ dependencies { @@ -41,6 +41,7 @@ dependencies {
optional "io.reactivex:rxnetty:0.5.0-SNAPSHOT"
optional "io.projectreactor:reactor-stream:2.1.0.BUILD-SNAPSHOT"
optional "io.projectreactor:reactor-net:2.1.0.BUILD-SNAPSHOT"
optional 'org.apache.tomcat:tomcat-util:8.0.28'

81
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/PublisherReactorServerHttpRequest.java

@ -0,0 +1,81 @@ @@ -0,0 +1,81 @@
/*
* Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http.reactor;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.io.buffer.Buffer;
import reactor.io.net.http.HttpChannel;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.util.Assert;
/**
* @author Stephane Maldini
*/
public class PublisherReactorServerHttpRequest implements ServerHttpRequest {
private final HttpChannel<Buffer, ?> channel;
private HttpHeaders headers;
public PublisherReactorServerHttpRequest(HttpChannel<Buffer, ?> request) {
Assert.notNull("'request', request must not be null.");
this.channel = request;
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
for (String name : this.channel.headers().names()) {
for (String value : this.channel.headers().getAll(name)) {
this.headers.add(name, value);
}
}
}
return this.headers;
}
@Override
public HttpMethod getMethod() {
return HttpMethod.valueOf(this.channel.method().getName());
}
@Override
public URI getURI() {
try {
return new URI(this.channel.uri());
} catch (URISyntaxException ex) {
throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
}
}
@Override
public Publisher<ByteBuffer> getBody() {
return Publishers.map(channel.input(), Buffer::byteBuffer);
}
}

85
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/PublisherReactorServerHttpResponse.java

@ -0,0 +1,85 @@ @@ -0,0 +1,85 @@
/*
* Copyright (c) 2011-2015 Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.web.http.reactor;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.io.buffer.Buffer;
import reactor.io.net.http.HttpChannel;
import reactor.io.net.http.model.Status;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.reactive.web.http.ServerHttpResponse;
import org.springframework.util.Assert;
/**
* @author Stephane Maldini
*/
public class PublisherReactorServerHttpResponse implements ServerHttpResponse {
private final HttpChannel<?, Buffer> channel;
private final HttpHeaders headers;
private boolean headersWritten = false;
public PublisherReactorServerHttpResponse(HttpChannel<?, Buffer> response) {
Assert.notNull("'response', response must not be null.");
this.channel = response;
this.headers = new HttpHeaders();
}
@Override
public void setStatusCode(HttpStatus status) {
this.channel.responseStatus(Status.valueOf(status.value()));
}
@Override
public HttpHeaders getHeaders() {
return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);
}
@Override
public Publisher<Void> writeHeaders() {
if (this.headersWritten) {
return Publishers.empty();
}
applyHeaders();
return this.channel.writeHeaders();
}
@Override
public Publisher<Void> writeWith(Publisher<ByteBuffer> contentPublisher) {
applyHeaders();
return this.channel.writeWith(Publishers.map(contentPublisher, Buffer::new));
}
private void applyHeaders() {
if (!this.headersWritten) {
for (String name : this.headers.keySet()) {
for (String value : this.headers.get(name)) {
this.channel.responseHeaders().add(name, value);
}
}
this.headersWritten = true;
}
}
}

10
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/ReactorHttpServer.java

@ -16,9 +16,8 @@ @@ -16,9 +16,8 @@
package org.springframework.reactive.web.http.reactor;
import reactor.bus.selector.Selectors;
import reactor.io.buffer.Buffer;
import reactor.io.net.NetStreams;
import reactor.io.net.ReactiveNet;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.reactive.web.http.HttpServer;
@ -48,16 +47,15 @@ public class ReactorHttpServer extends HttpServerSupport @@ -48,16 +47,15 @@ public class ReactorHttpServer extends HttpServerSupport
Assert.notNull(getHttpHandler());
this.reactorHandler = new RequestHandlerAdapter(getHttpHandler());
this.reactorServer = (getPort() != -1 ? NetStreams.httpServer(getPort()) :
NetStreams.httpServer());
this.reactorServer = (getPort() != -1 ? ReactiveNet.httpServer(getPort()) :
ReactiveNet.httpServer());
}
@Override
public void start() {
if (!this.running) {
try {
this.reactorServer.route(Selectors.matchAll(), this.reactorHandler)
.start().await();
this.reactorServer.startAndAwait(reactorHandler);
this.running = true;
}
catch (InterruptedException ex) {

52
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/ReactorServerHttpRequest.java

@ -15,65 +15,25 @@ @@ -15,65 +15,25 @@
*/
package org.springframework.reactive.web.http.reactor;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.util.Assert;
import java.nio.ByteBuffer;
import reactor.io.buffer.Buffer;
import reactor.io.net.http.HttpChannel;
import reactor.rx.Stream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import reactor.rx.Streams;
/**
* @author Stephane Maldini
*/
public class ReactorServerHttpRequest implements ServerHttpRequest {
private final HttpChannel<Buffer, ?> channel;
private HttpHeaders headers;
public class ReactorServerHttpRequest extends PublisherReactorServerHttpRequest {
public ReactorServerHttpRequest(HttpChannel<Buffer, ?> request) {
Assert.notNull("'request', request must not be null.");
this.channel = request;
}
@Override
public HttpHeaders getHeaders() {
if (this.headers == null) {
this.headers = new HttpHeaders();
for (String name : this.channel.headers().names()) {
for (String value : this.channel.headers().getAll(name)) {
this.headers.add(name, value);
}
}
}
return this.headers;
}
@Override
public HttpMethod getMethod() {
return HttpMethod.valueOf(this.channel.method().getName());
}
@Override
public URI getURI() {
try {
return new URI(this.channel.uri());
} catch (URISyntaxException ex) {
throw new IllegalStateException("Could not get URI: " + ex.getMessage(), ex);
}
super(request);
}
@Override
public Stream<ByteBuffer> getBody() {
return this.channel.map(Buffer::byteBuffer);
return Streams.wrap(super.getBody());
}
}

47
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/ReactorServerHttpResponse.java

@ -25,61 +25,26 @@ import reactor.io.buffer.Buffer; @@ -25,61 +25,26 @@ import reactor.io.buffer.Buffer;
import reactor.io.net.http.HttpChannel;
import reactor.io.net.http.model.Status;
import reactor.rx.Stream;
import reactor.rx.Streams;
import java.nio.ByteBuffer;
/**
* @author Stephane Maldini
*/
public class ReactorServerHttpResponse implements ServerHttpResponse {
private final HttpChannel<?, Buffer> channel;
private final HttpHeaders headers;
private boolean headersWritten = false;
public class ReactorServerHttpResponse extends PublisherReactorServerHttpResponse {
public ReactorServerHttpResponse(HttpChannel<?, Buffer> response) {
Assert.notNull("'response', response must not be null.");
this.channel = response;
this.headers = new HttpHeaders();
super(response);
}
@Override
public void setStatusCode(HttpStatus status) {
this.channel.responseStatus(Status.valueOf(status.value()));
}
@Override
public HttpHeaders getHeaders() {
return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);
}
@Override
public Publisher<Void> writeHeaders() {
if (this.headersWritten) {
return Publishers.empty();
}
applyHeaders();
return this.channel.writeHeaders();
public Stream<Void> writeHeaders() {
return Streams.wrap(super.writeHeaders());
}
@Override
public Stream<Void> writeWith(Publisher<ByteBuffer> contentPublisher) {
applyHeaders();
return this.channel.writeWith(Publishers.map(contentPublisher, Buffer::new));
}
private void applyHeaders() {
if (!this.headersWritten) {
for (String name : this.headers.keySet()) {
for (String value : this.headers.get(name)) {
this.channel.responseHeaders().add(name, value);
}
}
this.headersWritten = true;
}
return Streams.wrap(super.writeWith(contentPublisher));
}
}

24
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/reactor/RequestHandlerAdapter.java

@ -16,16 +16,19 @@ @@ -16,16 +16,19 @@
package org.springframework.reactive.web.http.reactor;
import org.reactivestreams.Publisher;
import org.springframework.reactive.web.http.HttpHandler;
import org.springframework.util.Assert;
import reactor.core.publisher.convert.DependencyUtils;
import reactor.io.buffer.Buffer;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.ReactiveChannelHandler;
import reactor.io.net.http.HttpChannel;
import org.springframework.reactive.web.http.HttpHandler;
import org.springframework.util.Assert;
/**
* @author Stephane Maldini
*/
public class RequestHandlerAdapter implements ReactorChannelHandler<Buffer, Buffer, HttpChannel<Buffer, Buffer>> {
public class RequestHandlerAdapter
implements ReactiveChannelHandler<Buffer, Buffer, HttpChannel<Buffer, Buffer>> {
private final HttpHandler httpHandler;
@ -37,8 +40,17 @@ public class RequestHandlerAdapter implements ReactorChannelHandler<Buffer, Buff @@ -37,8 +40,17 @@ public class RequestHandlerAdapter implements ReactorChannelHandler<Buffer, Buff
@Override
public Publisher<Void> apply(HttpChannel<Buffer, Buffer> channel) {
ReactorServerHttpRequest adaptedRequest = new ReactorServerHttpRequest(channel);
ReactorServerHttpResponse adaptedResponse = new ReactorServerHttpResponse(channel);
final PublisherReactorServerHttpRequest adaptedRequest;
final PublisherReactorServerHttpResponse adaptedResponse;
if(DependencyUtils.hasReactorStream()){
adaptedRequest = new ReactorServerHttpRequest(channel);
adaptedResponse = new ReactorServerHttpResponse(channel);
}
else{
adaptedRequest = new PublisherReactorServerHttpRequest(channel);
adaptedResponse = new PublisherReactorServerHttpResponse(channel);
}
return this.httpHandler.handle(adaptedRequest, adaptedResponse);
}
}

6
spring-web-reactive/src/test/java/org/springframework/reactive/web/http/AbstractHttpHandlerIntegrationTests.java

@ -40,10 +40,10 @@ public abstract class AbstractHttpHandlerIntegrationTests { @@ -40,10 +40,10 @@ public abstract class AbstractHttpHandlerIntegrationTests {
@Parameterized.Parameters(name = "server [{0}]")
public static Object[][] arguments() {
return new Object[][] {
{new JettyHttpServer()},
{new TomcatHttpServer()},
/*{new JettyHttpServer()},
{new RxNettyHttpServer()},
{new ReactorHttpServer()}
{new ReactorHttpServer()},*/
{new TomcatHttpServer()}
};
}

Loading…
Cancel
Save