Browse Source

Use latest Reactor core capabilities

No need for rxjava-reactive-streams dependency and
 CompletableFutureUtils anymore.
pull/1111/head
Sebastien Deleuze 9 years ago
parent
commit
cf2c1514af
  1. 1
      spring-web-reactive/build.gradle
  2. 156
      spring-web-reactive/src/main/java/org/springframework/reactive/util/CompletableFutureUtils.java
  3. 11
      spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestBodyArgumentResolver.java
  4. 11
      spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandler.java
  5. 4
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RequestHandlerAdapter.java
  6. 21
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpRequest.java
  7. 21
      spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpResponse.java
  8. 101
      spring-web-reactive/src/test/java/org/springframework/reactive/util/CompletableFutureUtilsTests.java

1
spring-web-reactive/build.gradle

@ -40,7 +40,6 @@ dependencies { @@ -40,7 +40,6 @@ dependencies {
optional "com.fasterxml.jackson.core:jackson-databind:2.6.1"
optional "io.reactivex:rxnetty:0.5.0-SNAPSHOT"
optional "io.reactivex:rxjava-reactive-streams:1.0.1"
optional "io.projectreactor:reactor-net:2.1.0.BUILD-SNAPSHOT"

156
spring-web-reactive/src/main/java/org/springframework/reactive/util/CompletableFutureUtils.java

@ -1,156 +0,0 @@ @@ -1,156 +0,0 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.util;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.util.Assert;
import reactor.Publishers;
import reactor.core.error.CancelException;
import reactor.core.error.Exceptions;
import reactor.core.support.BackpressureUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
/**
* @author Sebastien Deleuze
* @author Stephane Maldini
*/
public class CompletableFutureUtils {
public static <T> Publisher<T> toPublisher(CompletableFuture<T> future) {
return new CompletableFuturePublisher<T>(future);
}
public static <T> CompletableFuture<List<T>> fromPublisher(Publisher<T> publisher) {
final CompletableFuture<List<T>> future = new CompletableFuture<>();
publisher.subscribe(new Subscriber<T>() {
private final List<T> values = new ArrayList<>();
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(T t) {
values.add(t);
}
@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}
@Override
public void onComplete() {
future.complete(values);
}
});
return future;
}
public static <T> CompletableFuture<T> fromSinglePublisher(Publisher<T> publisher) {
final CompletableFuture<T> future = new CompletableFuture<>();
publisher.subscribe(new Subscriber<T>() {
private T value;
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(T t) {
Assert.state(value == null, "This publisher should not publish multiple values");
value = t;
}
@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}
@Override
public void onComplete() {
future.complete(value);
}
});
return future;
}
private static class CompletableFuturePublisher<T> implements Publisher<T> {
private final CompletableFuture<? extends T> future;
private final Publisher<? extends T> futurePublisher;
@SuppressWarnings("unused")
private volatile long requested;
private static final AtomicLongFieldUpdater<CompletableFuturePublisher> REQUESTED =
AtomicLongFieldUpdater.newUpdater(CompletableFuturePublisher.class, "requested");
public CompletableFuturePublisher(CompletableFuture<? extends T> future) {
this.future = future;
this.futurePublisher = Publishers.createWithDemand((n, sub) -> {
if (!BackpressureUtils.checkRequest(n, sub)) {
return;
}
if(BackpressureUtils.getAndAdd(REQUESTED, CompletableFuturePublisher.this, n) > 0) {
return;
}
future.whenComplete((result, error) -> {
if (error != null) {
sub.onError(error);
} else {
sub.onNext(result);
sub.onComplete();
}
});
}, null, nothing -> {
if(!future.isDone()){
future.cancel(true);
}
});
}
@Override
public void subscribe(final Subscriber<? super T> subscriber) {
try {
if (future.isDone()) {
Publishers.just(future.get()).subscribe(subscriber);
}
else if ( future.isCancelled()){
Exceptions.publisher(CancelException.get());
}
else {
futurePublisher.subscribe(subscriber);
}
}
catch (Throwable throwable) {
Exceptions.publisher(throwable);
}
}
}
}

11
spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestBodyArgumentResolver.java

@ -26,11 +26,13 @@ import java.util.concurrent.TimeUnit; @@ -26,11 +26,13 @@ import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import reactor.Publishers;
import reactor.core.publisher.convert.CompletableFutureConverter;
import reactor.core.publisher.convert.RxJava1Converter;
import reactor.core.publisher.convert.RxJava1SingleConverter;
import reactor.rx.Promise;
import reactor.rx.Stream;
import reactor.rx.Streams;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.Single;
import org.springframework.core.MethodParameter;
@ -38,7 +40,6 @@ import org.springframework.core.ResolvableType; @@ -38,7 +40,6 @@ import org.springframework.core.ResolvableType;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.reactive.codec.decoder.ByteToMessageDecoder;
import org.springframework.reactive.util.CompletableFutureUtils;
import org.springframework.reactive.web.dispatch.method.HandlerMethodArgumentResolver;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.web.bind.annotation.RequestBody;
@ -104,13 +105,13 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve @@ -104,13 +105,13 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve
return Streams.wrap(elementStream).take(1).next();
}
else if (Observable.class.isAssignableFrom(type.getRawClass())) {
return RxReactiveStreams.toObservable(elementStream);
return RxJava1Converter.from(elementStream);
}
else if (Single.class.isAssignableFrom(type.getRawClass())) {
return RxReactiveStreams.toObservable(elementStream).toSingle();
return RxJava1SingleConverter.from(elementStream);
}
else if (CompletableFuture.class.isAssignableFrom(type.getRawClass())) {
return CompletableFutureUtils.fromSinglePublisher(elementStream);
return CompletableFutureConverter.fromSingle(elementStream);
}
else if (Publisher.class.isAssignableFrom(type.getRawClass())) {
return elementStream;

11
spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandler.java

@ -24,7 +24,6 @@ import org.springframework.core.annotation.AnnotatedElementUtils; @@ -24,7 +24,6 @@ import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.reactive.codec.encoder.MessageToByteEncoder;
import org.springframework.reactive.util.CompletableFutureUtils;
import org.springframework.reactive.web.dispatch.HandlerResult;
import org.springframework.reactive.web.dispatch.HandlerResultHandler;
import org.springframework.reactive.web.http.ServerHttpRequest;
@ -32,9 +31,11 @@ import org.springframework.reactive.web.http.ServerHttpResponse; @@ -32,9 +31,11 @@ import org.springframework.reactive.web.http.ServerHttpResponse;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.method.HandlerMethod;
import reactor.Publishers;
import reactor.core.publisher.convert.CompletableFutureConverter;
import reactor.core.publisher.convert.RxJava1Converter;
import reactor.core.publisher.convert.RxJava1SingleConverter;
import reactor.rx.Promise;
import rx.Observable;
import rx.RxReactiveStreams;
import rx.Single;
import java.lang.reflect.Type;
@ -118,13 +119,13 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered @@ -118,13 +119,13 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered
elementStream = ((Promise)value).stream();
}
else if (Observable.class.isAssignableFrom(type.getRawClass())) {
elementStream = RxReactiveStreams.toPublisher((Observable) value);
elementStream = RxJava1Converter.from((Observable) value);
}
else if (Single.class.isAssignableFrom(type.getRawClass())) {
elementStream = RxReactiveStreams.toPublisher(((Single)value).toObservable());
elementStream = RxJava1SingleConverter.from((Single)value);
}
else if (CompletableFuture.class.isAssignableFrom(type.getRawClass())) {
elementStream = CompletableFutureUtils.toPublisher((CompletableFuture) value);
elementStream = CompletableFutureConverter.from((CompletableFuture) value);
}
else if (Publisher.class.isAssignableFrom(type.getRawClass())) {
elementStream = (Publisher)value;

4
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RequestHandlerAdapter.java

@ -20,8 +20,8 @@ import io.reactivex.netty.protocol.http.server.HttpServerRequest; @@ -20,8 +20,8 @@ import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.http.server.RequestHandler;
import org.reactivestreams.Publisher;
import reactor.core.publisher.convert.RxJava1Converter;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.reactive.web.http.HttpHandler;
import org.springframework.util.Assert;
@ -44,7 +44,7 @@ public class RequestHandlerAdapter implements RequestHandler<ByteBuf, ByteBuf> { @@ -44,7 +44,7 @@ public class RequestHandlerAdapter implements RequestHandler<ByteBuf, ByteBuf> {
RxNettyServerHttpRequest adaptedRequest = new RxNettyServerHttpRequest(request);
RxNettyServerHttpResponse adaptedResponse = new RxNettyServerHttpResponse(response);
Publisher<Void> result = this.httpHandler.handle(adaptedRequest, adaptedResponse);
return RxReactiveStreams.toObservable(result);
return RxJava1Converter.from(result);
}
}

21
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpRequest.java

@ -15,22 +15,24 @@ @@ -15,22 +15,24 @@
*/
package org.springframework.reactive.web.http.rxnetty;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import io.netty.buffer.ByteBuf;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import org.reactivestreams.Publisher;
import rx.Observable;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.reactive.web.http.ServerHttpRequest;
import org.springframework.util.Assert;
import reactor.core.publisher.convert.RxJava1Converter;
import rx.Observable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
/**
* @author Rossen Stoyanchev
* @author Stephane Maldini
*/
public class RxNettyServerHttpRequest implements ServerHttpRequest {
@ -76,8 +78,11 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest { @@ -76,8 +78,11 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest {
@Override
public Publisher<ByteBuffer> getBody() {
Observable<ByteBuffer> bytesContent = this.request.getContent().map(byteBuf -> byteBuf.nioBuffer());
return rx.RxReactiveStreams.toPublisher(bytesContent);
Observable<ByteBuffer> bytesContent = this.request.getContent().map(ByteBuf::nioBuffer);
return RxJava1Converter.from(bytesContent);
}
public Observable<ByteBuffer> asObservable() {
return this.request.getContent().map(ByteBuf::nioBuffer);
}
}

21
spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpResponse.java

@ -15,22 +15,23 @@ @@ -15,22 +15,23 @@
*/
package org.springframework.reactive.web.http.rxnetty;
import java.nio.ByteBuffer;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import org.reactivestreams.Publisher;
import reactor.io.buffer.Buffer;
import rx.Observable;
import rx.RxReactiveStreams;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.reactive.web.http.ServerHttpResponse;
import org.springframework.util.Assert;
import reactor.core.publisher.convert.RxJava1Converter;
import reactor.io.buffer.Buffer;
import rx.Observable;
import java.nio.ByteBuffer;
/**
* @author Rossen Stoyanchev
* @author Stephane Maldini
*/
public class RxNettyServerHttpResponse implements ServerHttpResponse {
@ -58,11 +59,15 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse { @@ -58,11 +59,15 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse {
return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers);
}
public Observable<Void> writeWith(Observable<ByteBuffer> contentPublisher) {
return this.response.writeBytes(contentPublisher.map(content -> new Buffer(content).asBytes()));
}
@Override
public Publisher<Void> writeWith(Publisher<ByteBuffer> contentPublisher) {
writeHeaders();
Observable<byte[]> contentObservable = RxReactiveStreams.toObservable(contentPublisher).map(content -> new Buffer(content).asBytes());
return RxReactiveStreams.toPublisher(this.response.writeBytes(contentObservable));
Observable<byte[]> contentObservable = RxJava1Converter.from(contentPublisher).map(content -> new Buffer(content).asBytes());
return RxJava1Converter.from(this.response.writeBytes(contentObservable));
}
private void writeHeaders() {

101
spring-web-reactive/src/test/java/org/springframework/reactive/util/CompletableFutureUtilsTests.java

@ -1,101 +0,0 @@ @@ -1,101 +0,0 @@
/*
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.reactive.util;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.rx.Streams;
/**
* @author Sebastien Deleuze
*/
public class CompletableFutureUtilsTests {
private CountDownLatch lock = new CountDownLatch(1);
private final List<Boolean> results = new ArrayList<>();
private final List<Throwable> errors = new ArrayList<>();
@Test
public void fromPublisher() throws InterruptedException {
Publisher<Boolean> publisher = Streams.just(true, false);
CompletableFuture<List<Boolean>> future = CompletableFutureUtils.fromPublisher(publisher);
future.whenComplete((result, error) -> {
if (error != null) {
errors.add(error);
}
else {
results.addAll(result);
}
lock.countDown();
});
lock.await(2000, TimeUnit.MILLISECONDS);
assertEquals("onError not expected: " + errors.toString(), 0, errors.size());
assertEquals(2, results.size());
assertTrue(results.get(0));
assertFalse(results.get(1));
}
@Test
public void fromSinglePublisher() throws InterruptedException {
Publisher<Boolean> publisher = Streams.just(true);
CompletableFuture<Boolean> future = CompletableFutureUtils.fromSinglePublisher(publisher);
future.whenComplete((result, error) -> {
if (error != null) {
errors.add(error);
}
else {
results.add(result);
}
lock.countDown();
});
lock.await(2000, TimeUnit.MILLISECONDS);
assertEquals("onError not expected: " + errors.toString(), 0, errors.size());
assertEquals(1, results.size());
assertTrue(results.get(0));
}
@Test
public void fromSinglePublisherWithMultipleValues() throws InterruptedException {
Publisher<Boolean> publisher = Streams.just(true, false);
CompletableFuture<Boolean> future = CompletableFutureUtils.fromSinglePublisher(publisher);
future.whenComplete((result, error) -> {
if (error != null) {
errors.add(error);
}
else {
results.add(result);
}
lock.countDown();
});
lock.await(2000, TimeUnit.MILLISECONDS);
assertEquals(1, errors.size());
assertEquals(IllegalStateException.class, errors.get(0).getClass());
assertEquals(0, results.size());
}
}
Loading…
Cancel
Save