Browse Source

Use Streams from reactor-streams

pull/1111/head
Rossen Stoyanchev 9 years ago
parent
commit
6634d1f536
  1. 1
      spring-web-reactive/build.gradle
  2. 71
      spring-web-reactive/src/main/java/org/springframework/reactive/web/DispatcherHttpHandler.java
  3. 52
      spring-web-reactive/src/test/java/org/springframework/reactive/web/DispatcherApp.java

1
spring-web-reactive/build.gradle

@ -23,6 +23,7 @@ dependencies { @@ -23,6 +23,7 @@ dependencies {
compile "org.springframework:spring-web:4.2.0.RELEASE"
compile "org.reactivestreams:reactive-streams:1.0.0"
compile "io.projectreactor:reactor-core:2.0.5.RELEASE"
compile "io.projectreactor:reactor-net:2.0.5.RELEASE"
compile "org.slf4j:slf4j-api:1.7.6"
compile "ch.qos.logback:logback-classic:1.1.2"

71
spring-web-reactive/src/main/java/org/springframework/reactive/web/DispatcherHttpHandler.java

@ -19,10 +19,10 @@ import java.util.ArrayList; @@ -19,10 +19,10 @@ import java.util.ArrayList;
import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.reactivestreams.PublisherFactory;
import reactor.core.reactivestreams.SubscriberWithContext;
import reactor.rx.Promises;
import reactor.rx.Streams;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.context.ApplicationContext;
@ -64,64 +64,17 @@ public class DispatcherHttpHandler implements HttpHandler { @@ -64,64 +64,17 @@ public class DispatcherHttpHandler implements HttpHandler {
}
HandlerAdapter handlerAdapter = getHandlerAdapter(handler);
final Publisher<HandlerResult> resultPublisher = handlerAdapter.handle(request, response, handler);
return new Publisher<Void>() {
@Override
public void subscribe(final Subscriber<? super Void> subscriber) {
resultPublisher.subscribe(new Subscriber<HandlerResult>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(HandlerResult result) {
for (HandlerResultHandler resultHandler : resultHandlers) {
if (resultHandler.supports(result)) {
Publisher<Void> publisher = resultHandler.handleResult(request, response, result);
publisher.subscribe(new Subscriber<Void>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Void aVoid) {
// no op
}
@Override
public void onError(Throwable error) {
// Result handling error (no exception handling mechanism yet)
subscriber.onError(error);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
});
}
}
}
@Override
public void onError(Throwable error) {
// Application handler error (no exception handling mechanism yet)
subscriber.onError(error);
}
@Override
public void onComplete() {
// do nothing
}
});
Publisher<HandlerResult> resultPublisher = handlerAdapter.handle(request, response, handler);
return Streams.wrap(resultPublisher).concatMap((HandlerResult result) -> {
for (HandlerResultHandler resultHandler : resultHandlers) {
if (resultHandler.supports(result)) {
return resultHandler.handleResult(request, response, result);
}
}
};
String error = "No HandlerResultHandler for " + result.getReturnValue();
return Promises.error(new IllegalStateException(error));
});
}
protected Object getHandler(ServerHttpRequest request) {

52
spring-web-reactive/src/test/java/org/springframework/reactive/web/DispatcherApp.java

@ -22,9 +22,8 @@ import java.util.Map; @@ -22,9 +22,8 @@ import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.server.HttpServer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.reactivestreams.PublisherFactory;
import reactor.rx.Stream;
import reactor.rx.Streams;
import org.springframework.http.MediaType;
import org.springframework.reactive.web.rxnetty.RequestHandlerAdapter;
@ -81,12 +80,8 @@ public class DispatcherApp { @@ -81,12 +80,8 @@ public class DispatcherApp {
@Override
public Publisher<String> handle(ServerHttpRequest request, ServerHttpResponse response) {
return PublisherFactory.forEach((subscriber) -> {
subscriber.onNext("Hello world.");
subscriber.onComplete();
});
return Streams.just("Hello world.");
}
}
private static class PlainTextHandlerAdapter implements HandlerAdapter {
@ -97,36 +92,10 @@ public class DispatcherApp { @@ -97,36 +92,10 @@ public class DispatcherApp {
}
@Override
public Publisher<HandlerResult> handle(ServerHttpRequest request, ServerHttpResponse response,
Object handler) {
PlainTextHandler textHandler = (PlainTextHandler) handler;
final Publisher<String> resultPublisher = textHandler.handle(request, response);
return PublisherFactory.forEach((subscriber) -> {
resultPublisher.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object result) {
subscriber.onNext(new HandlerResult(result));
}
@Override
public void onError(Throwable error) {
subscriber.onError(error);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
});
});
public Publisher<HandlerResult> handle(ServerHttpRequest request, ServerHttpResponse response, Object handler) {
Publisher<String> resultPublisher = ((PlainTextHandler) handler).handle(request, response);
Stream<String> stream = Streams.wrap(resultPublisher);
return stream.concatMap((returnValue) -> Streams.just(new HandlerResult(returnValue)));
}
}
@ -141,11 +110,8 @@ public class DispatcherApp { @@ -141,11 +110,8 @@ public class DispatcherApp {
@Override
public Publisher<Void> handleResult(ServerHttpRequest request, ServerHttpResponse response, HandlerResult result) {
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
return response.writeWith(PublisherFactory.forEach((writeSubscriber) -> {
Charset charset = Charset.forName("UTF-8");
writeSubscriber.onNext(((String) result.getReturnValue()).getBytes(charset));
writeSubscriber.onComplete();
}));
byte[] bytes = ((String) result.getReturnValue()).getBytes(Charset.forName("UTF-8"));
return response.writeWith(Streams.just(bytes));
}
}

Loading…
Cancel
Save