Browse Source

Use PublisherFactory from reactor-core

pull/1111/head
Rossen Stoyanchev 9 years ago
parent
commit
69d4eaaa3c
  1. 1
      spring-web-reactive/build.gradle
  2. 27
      spring-web-reactive/src/main/java/org/springframework/reactive/web/DispatcherHttpHandler.java
  3. 151
      spring-web-reactive/src/test/java/org/springframework/reactive/web/DispatcherApp.java

1
spring-web-reactive/build.gradle

@ -22,6 +22,7 @@ dependencies { @@ -22,6 +22,7 @@ dependencies {
compile "org.springframework:spring-core:4.2.0.RELEASE"
compile "org.springframework:spring-web:4.2.0.RELEASE"
compile "org.reactivestreams:reactive-streams:1.0.0"
compile "io.projectreactor:reactor-core:2.0.5.RELEASE"
compile "org.slf4j:slf4j-api:1.7.6"
compile "ch.qos.logback:logback-classic:1.1.2"

27
spring-web-reactive/src/main/java/org/springframework/reactive/web/DispatcherHttpHandler.java

@ -21,6 +21,8 @@ import java.util.List; @@ -21,6 +21,8 @@ import java.util.List;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.reactivestreams.PublisherFactory;
import reactor.core.reactivestreams.SubscriberWithContext;
import org.springframework.beans.factory.BeanFactoryUtils;
import org.springframework.context.ApplicationContext;
@ -58,7 +60,7 @@ public class DispatcherHttpHandler implements HttpHandler { @@ -58,7 +60,7 @@ public class DispatcherHttpHandler implements HttpHandler {
if (handler == null) {
// No exception handling mechanism yet
response.setStatusCode(HttpStatus.NOT_FOUND);
return Publishers.complete();
return PublisherFactory.forEach(SubscriberWithContext::onComplete);
}
HandlerAdapter handlerAdapter = getHandlerAdapter(handler);
@ -143,27 +145,4 @@ public class DispatcherHttpHandler implements HttpHandler { @@ -143,27 +145,4 @@ public class DispatcherHttpHandler implements HttpHandler {
throw new IllegalStateException("No HandlerAdapter for " + handler);
}
private static class Publishers {
public static Publisher<Void> complete() {
return subscriber -> {
subscriber.onSubscribe(new NoopSubscription());
subscriber.onComplete();
};
}
}
private static class NoopSubscription implements Subscription {
@Override
public void request(long n) {
}
@Override
public void cancel() {
}
}
}

151
spring-web-reactive/src/test/java/org/springframework/reactive/web/DispatcherApp.java

@ -24,6 +24,7 @@ import io.reactivex.netty.protocol.http.server.HttpServer; @@ -24,6 +24,7 @@ import io.reactivex.netty.protocol.http.server.HttpServer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.reactivestreams.PublisherFactory;
import org.springframework.http.MediaType;
import org.springframework.reactive.web.rxnetty.RequestHandlerAdapter;
@ -80,21 +81,10 @@ public class DispatcherApp { @@ -80,21 +81,10 @@ public class DispatcherApp {
@Override
public Publisher<String> handle(ServerHttpRequest request, ServerHttpResponse response) {
return new Publisher<String>() {
@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onSubscribe(new AbstractSubscription<String>(subscriber) {
@Override
protected void requestInternal(long n) {
invokeOnNext("Hello world.");
invokeOnComplete();
}
});
}
};
return PublisherFactory.forEach((subscriber) -> {
subscriber.onNext("Hello world.");
subscriber.onComplete();
});
}
}
@ -113,40 +103,30 @@ public class DispatcherApp { @@ -113,40 +103,30 @@ public class DispatcherApp {
PlainTextHandler textHandler = (PlainTextHandler) handler;
final Publisher<String> resultPublisher = textHandler.handle(request, response);
return new Publisher<HandlerResult>() {
@Override
public void subscribe(Subscriber<? super HandlerResult> handlerResultSubscriber) {
handlerResultSubscriber.onSubscribe(new AbstractSubscription<HandlerResult>(handlerResultSubscriber) {
@Override
protected void requestInternal(long n) {
resultPublisher.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object result) {
invokeOnNext(new HandlerResult(result));
}
@Override
public void onError(Throwable error) {
invokeOnError(error);
}
@Override
public void onComplete() {
invokeOnComplete();
}
});
}
});
}
};
return PublisherFactory.forEach((subscriber) -> {
resultPublisher.subscribe(new Subscriber<Object>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object result) {
subscriber.onNext(new HandlerResult(result));
}
@Override
public void onError(Throwable error) {
subscriber.onError(error);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
});
});
}
}
@ -159,74 +139,13 @@ public class DispatcherApp { @@ -159,74 +139,13 @@ public class DispatcherApp {
}
@Override
public Publisher<Void> handleResult(ServerHttpRequest request, ServerHttpResponse response,
HandlerResult result) {
public Publisher<Void> handleResult(ServerHttpRequest request, ServerHttpResponse response, HandlerResult result) {
response.getHeaders().setContentType(MediaType.TEXT_PLAIN);
return response.writeWith(new Publisher<byte[]>() {
@Override
public void subscribe(Subscriber<? super byte[]> writeSubscriber) {
writeSubscriber.onSubscribe(new AbstractSubscription<byte[]>(writeSubscriber) {
@Override
protected void requestInternal(long n) {
Charset charset = Charset.forName("UTF-8");
invokeOnNext(((String) result.getReturnValue()).getBytes(charset));
invokeOnComplete();
}
});
}
});
}
}
private static abstract class AbstractSubscription<T> implements Subscription {
private final Subscriber<? super T> subscriber;
private volatile boolean terminated;
public AbstractSubscription(Subscriber<? super T> subscriber) {
this.subscriber = subscriber;
}
protected boolean isTerminated() {
return this.terminated;
}
@Override
public void request(long n) {
if (isTerminated()) {
return;
}
if (n > 0) {
requestInternal(n);
}
}
protected abstract void requestInternal(long n);
@Override
public void cancel() {
this.terminated = true;
}
protected void invokeOnNext(T data) {
this.subscriber.onNext(data);
}
protected void invokeOnError(Throwable error) {
this.terminated = true;
this.subscriber.onError(error);
}
protected void invokeOnComplete() {
this.terminated = true;
this.subscriber.onComplete();
return response.writeWith(PublisherFactory.forEach((writeSubscriber) -> {
Charset charset = Charset.forName("UTF-8");
writeSubscriber.onNext(((String) result.getReturnValue()).getBytes(charset));
writeSubscriber.onComplete();
}));
}
}

Loading…
Cancel
Save