diff --git a/spring-web-reactive/build.gradle b/spring-web-reactive/build.gradle index 8d0d6709f5..f3682d0902 100644 --- a/spring-web-reactive/build.gradle +++ b/spring-web-reactive/build.gradle @@ -40,7 +40,6 @@ dependencies { optional "com.fasterxml.jackson.core:jackson-databind:2.6.1" optional "io.reactivex:rxnetty:0.5.0-SNAPSHOT" - optional "io.reactivex:rxjava-reactive-streams:1.0.1" optional "io.projectreactor:reactor-net:2.1.0.BUILD-SNAPSHOT" diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/util/CompletableFutureUtils.java b/spring-web-reactive/src/main/java/org/springframework/reactive/util/CompletableFutureUtils.java deleted file mode 100644 index ff5e19e1ad..0000000000 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/util/CompletableFutureUtils.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.reactive.util; - -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import org.springframework.util.Assert; -import reactor.Publishers; -import reactor.core.error.CancelException; -import reactor.core.error.Exceptions; -import reactor.core.support.BackpressureUtils; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; - -/** - * @author Sebastien Deleuze - * @author Stephane Maldini - */ -public class CompletableFutureUtils { - - public static Publisher toPublisher(CompletableFuture future) { - return new CompletableFuturePublisher(future); - } - - public static CompletableFuture> fromPublisher(Publisher publisher) { - final CompletableFuture> future = new CompletableFuture<>(); - publisher.subscribe(new Subscriber() { - private final List values = new ArrayList<>(); - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(T t) { - values.add(t); - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onComplete() { - future.complete(values); - } - }); - return future; - } - - public static CompletableFuture fromSinglePublisher(Publisher publisher) { - final CompletableFuture future = new CompletableFuture<>(); - publisher.subscribe(new Subscriber() { - private T value; - - @Override - public void onSubscribe(Subscription s) { - s.request(Long.MAX_VALUE); - } - - @Override - public void onNext(T t) { - Assert.state(value == null, "This publisher should not publish multiple values"); - value = t; - } - - @Override - public void onError(Throwable t) { - future.completeExceptionally(t); - } - - @Override - public void onComplete() { - future.complete(value); - } - }); - return future; - } - - private static class CompletableFuturePublisher implements Publisher { - - private final CompletableFuture future; - private final Publisher futurePublisher; - - @SuppressWarnings("unused") - private volatile long requested; - private static final AtomicLongFieldUpdater REQUESTED = - AtomicLongFieldUpdater.newUpdater(CompletableFuturePublisher.class, "requested"); - - public CompletableFuturePublisher(CompletableFuture future) { - this.future = future; - this.futurePublisher = Publishers.createWithDemand((n, sub) -> { - - if (!BackpressureUtils.checkRequest(n, sub)) { - return; - } - - if(BackpressureUtils.getAndAdd(REQUESTED, CompletableFuturePublisher.this, n) > 0) { - return; - } - - future.whenComplete((result, error) -> { - if (error != null) { - sub.onError(error); - } else { - sub.onNext(result); - sub.onComplete(); - } - }); - }, null, nothing -> { - if(!future.isDone()){ - future.cancel(true); - } - }); - } - - @Override - public void subscribe(final Subscriber subscriber) { - try { - if (future.isDone()) { - Publishers.just(future.get()).subscribe(subscriber); - } - else if ( future.isCancelled()){ - Exceptions.publisher(CancelException.get()); - } - else { - futurePublisher.subscribe(subscriber); - } - } - catch (Throwable throwable) { - Exceptions.publisher(throwable); - } - } - } - -} \ No newline at end of file diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestBodyArgumentResolver.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestBodyArgumentResolver.java index f0607240ac..8a77f0b298 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestBodyArgumentResolver.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/RequestBodyArgumentResolver.java @@ -26,11 +26,13 @@ import java.util.concurrent.TimeUnit; import org.reactivestreams.Publisher; import reactor.Publishers; +import reactor.core.publisher.convert.CompletableFutureConverter; +import reactor.core.publisher.convert.RxJava1Converter; +import reactor.core.publisher.convert.RxJava1SingleConverter; import reactor.rx.Promise; import reactor.rx.Stream; import reactor.rx.Streams; import rx.Observable; -import rx.RxReactiveStreams; import rx.Single; import org.springframework.core.MethodParameter; @@ -38,7 +40,6 @@ import org.springframework.core.ResolvableType; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.reactive.codec.decoder.ByteToMessageDecoder; -import org.springframework.reactive.util.CompletableFutureUtils; import org.springframework.reactive.web.dispatch.method.HandlerMethodArgumentResolver; import org.springframework.reactive.web.http.ServerHttpRequest; import org.springframework.web.bind.annotation.RequestBody; @@ -104,13 +105,13 @@ public class RequestBodyArgumentResolver implements HandlerMethodArgumentResolve return Streams.wrap(elementStream).take(1).next(); } else if (Observable.class.isAssignableFrom(type.getRawClass())) { - return RxReactiveStreams.toObservable(elementStream); + return RxJava1Converter.from(elementStream); } else if (Single.class.isAssignableFrom(type.getRawClass())) { - return RxReactiveStreams.toObservable(elementStream).toSingle(); + return RxJava1SingleConverter.from(elementStream); } else if (CompletableFuture.class.isAssignableFrom(type.getRawClass())) { - return CompletableFutureUtils.fromSinglePublisher(elementStream); + return CompletableFutureConverter.fromSingle(elementStream); } else if (Publisher.class.isAssignableFrom(type.getRawClass())) { return elementStream; diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandler.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandler.java index 43b55f5d44..eee28f723e 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandler.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/dispatch/method/annotation/ResponseBodyResultHandler.java @@ -24,7 +24,6 @@ import org.springframework.core.annotation.AnnotatedElementUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.reactive.codec.encoder.MessageToByteEncoder; -import org.springframework.reactive.util.CompletableFutureUtils; import org.springframework.reactive.web.dispatch.HandlerResult; import org.springframework.reactive.web.dispatch.HandlerResultHandler; import org.springframework.reactive.web.http.ServerHttpRequest; @@ -32,9 +31,11 @@ import org.springframework.reactive.web.http.ServerHttpResponse; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.method.HandlerMethod; import reactor.Publishers; +import reactor.core.publisher.convert.CompletableFutureConverter; +import reactor.core.publisher.convert.RxJava1Converter; +import reactor.core.publisher.convert.RxJava1SingleConverter; import reactor.rx.Promise; import rx.Observable; -import rx.RxReactiveStreams; import rx.Single; import java.lang.reflect.Type; @@ -118,13 +119,13 @@ public class ResponseBodyResultHandler implements HandlerResultHandler, Ordered elementStream = ((Promise)value).stream(); } else if (Observable.class.isAssignableFrom(type.getRawClass())) { - elementStream = RxReactiveStreams.toPublisher((Observable) value); + elementStream = RxJava1Converter.from((Observable) value); } else if (Single.class.isAssignableFrom(type.getRawClass())) { - elementStream = RxReactiveStreams.toPublisher(((Single)value).toObservable()); + elementStream = RxJava1SingleConverter.from((Single)value); } else if (CompletableFuture.class.isAssignableFrom(type.getRawClass())) { - elementStream = CompletableFutureUtils.toPublisher((CompletableFuture) value); + elementStream = CompletableFutureConverter.from((CompletableFuture) value); } else if (Publisher.class.isAssignableFrom(type.getRawClass())) { elementStream = (Publisher)value; diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RequestHandlerAdapter.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RequestHandlerAdapter.java index 0b22686e0d..4149dce5ea 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RequestHandlerAdapter.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RequestHandlerAdapter.java @@ -20,8 +20,8 @@ import io.reactivex.netty.protocol.http.server.HttpServerRequest; import io.reactivex.netty.protocol.http.server.HttpServerResponse; import io.reactivex.netty.protocol.http.server.RequestHandler; import org.reactivestreams.Publisher; +import reactor.core.publisher.convert.RxJava1Converter; import rx.Observable; -import rx.RxReactiveStreams; import org.springframework.reactive.web.http.HttpHandler; import org.springframework.util.Assert; @@ -44,7 +44,7 @@ public class RequestHandlerAdapter implements RequestHandler { RxNettyServerHttpRequest adaptedRequest = new RxNettyServerHttpRequest(request); RxNettyServerHttpResponse adaptedResponse = new RxNettyServerHttpResponse(response); Publisher result = this.httpHandler.handle(adaptedRequest, adaptedResponse); - return RxReactiveStreams.toObservable(result); + return RxJava1Converter.from(result); } } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpRequest.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpRequest.java index e422bb7502..7aa05a50cd 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpRequest.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpRequest.java @@ -15,22 +15,24 @@ */ package org.springframework.reactive.web.http.rxnetty; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; - import io.netty.buffer.ByteBuf; import io.reactivex.netty.protocol.http.server.HttpServerRequest; import org.reactivestreams.Publisher; -import rx.Observable; - import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.reactive.web.http.ServerHttpRequest; import org.springframework.util.Assert; +import reactor.core.publisher.convert.RxJava1Converter; +import rx.Observable; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; + /** * @author Rossen Stoyanchev + * @author Stephane Maldini */ public class RxNettyServerHttpRequest implements ServerHttpRequest { @@ -76,8 +78,11 @@ public class RxNettyServerHttpRequest implements ServerHttpRequest { @Override public Publisher getBody() { - Observable bytesContent = this.request.getContent().map(byteBuf -> byteBuf.nioBuffer()); - return rx.RxReactiveStreams.toPublisher(bytesContent); + Observable bytesContent = this.request.getContent().map(ByteBuf::nioBuffer); + return RxJava1Converter.from(bytesContent); } + public Observable asObservable() { + return this.request.getContent().map(ByteBuf::nioBuffer); + } } diff --git a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpResponse.java b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpResponse.java index f8e941d639..161589cdcd 100644 --- a/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpResponse.java +++ b/spring-web-reactive/src/main/java/org/springframework/reactive/web/http/rxnetty/RxNettyServerHttpResponse.java @@ -15,22 +15,23 @@ */ package org.springframework.reactive.web.http.rxnetty; -import java.nio.ByteBuffer; - import io.netty.handler.codec.http.HttpResponseStatus; import io.reactivex.netty.protocol.http.server.HttpServerResponse; import org.reactivestreams.Publisher; -import reactor.io.buffer.Buffer; -import rx.Observable; -import rx.RxReactiveStreams; - import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.reactive.web.http.ServerHttpResponse; import org.springframework.util.Assert; +import reactor.core.publisher.convert.RxJava1Converter; +import reactor.io.buffer.Buffer; +import rx.Observable; + +import java.nio.ByteBuffer; + /** * @author Rossen Stoyanchev + * @author Stephane Maldini */ public class RxNettyServerHttpResponse implements ServerHttpResponse { @@ -58,11 +59,15 @@ public class RxNettyServerHttpResponse implements ServerHttpResponse { return (this.headersWritten ? HttpHeaders.readOnlyHttpHeaders(this.headers) : this.headers); } + public Observable writeWith(Observable contentPublisher) { + return this.response.writeBytes(contentPublisher.map(content -> new Buffer(content).asBytes())); + } + @Override public Publisher writeWith(Publisher contentPublisher) { writeHeaders(); - Observable contentObservable = RxReactiveStreams.toObservable(contentPublisher).map(content -> new Buffer(content).asBytes()); - return RxReactiveStreams.toPublisher(this.response.writeBytes(contentObservable)); + Observable contentObservable = RxJava1Converter.from(contentPublisher).map(content -> new Buffer(content).asBytes()); + return RxJava1Converter.from(this.response.writeBytes(contentObservable)); } private void writeHeaders() { diff --git a/spring-web-reactive/src/test/java/org/springframework/reactive/util/CompletableFutureUtilsTests.java b/spring-web-reactive/src/test/java/org/springframework/reactive/util/CompletableFutureUtilsTests.java deleted file mode 100644 index 2ea5a305e4..0000000000 --- a/spring-web-reactive/src/test/java/org/springframework/reactive/util/CompletableFutureUtilsTests.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright 2002-2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.reactive.util; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import org.junit.Test; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; -import reactor.rx.Streams; - -/** - * @author Sebastien Deleuze - */ -public class CompletableFutureUtilsTests { - - private CountDownLatch lock = new CountDownLatch(1); - private final List results = new ArrayList<>(); - private final List errors = new ArrayList<>(); - - @Test - public void fromPublisher() throws InterruptedException { - Publisher publisher = Streams.just(true, false); - CompletableFuture> future = CompletableFutureUtils.fromPublisher(publisher); - future.whenComplete((result, error) -> { - if (error != null) { - errors.add(error); - } - else { - results.addAll(result); - } - lock.countDown(); - }); - lock.await(2000, TimeUnit.MILLISECONDS); - assertEquals("onError not expected: " + errors.toString(), 0, errors.size()); - assertEquals(2, results.size()); - assertTrue(results.get(0)); - assertFalse(results.get(1)); - } - - @Test - public void fromSinglePublisher() throws InterruptedException { - Publisher publisher = Streams.just(true); - CompletableFuture future = CompletableFutureUtils.fromSinglePublisher(publisher); - future.whenComplete((result, error) -> { - if (error != null) { - errors.add(error); - } - else { - results.add(result); - } - lock.countDown(); - }); - lock.await(2000, TimeUnit.MILLISECONDS); - assertEquals("onError not expected: " + errors.toString(), 0, errors.size()); - assertEquals(1, results.size()); - assertTrue(results.get(0)); - } - - @Test - public void fromSinglePublisherWithMultipleValues() throws InterruptedException { - Publisher publisher = Streams.just(true, false); - CompletableFuture future = CompletableFutureUtils.fromSinglePublisher(publisher); - future.whenComplete((result, error) -> { - if (error != null) { - errors.add(error); - } - else { - results.add(result); - } - lock.countDown(); - }); - lock.await(2000, TimeUnit.MILLISECONDS); - assertEquals(1, errors.size()); - assertEquals(IllegalStateException.class, errors.get(0).getClass()); - assertEquals(0, results.size()); - } - -} \ No newline at end of file