@ -23,10 +23,6 @@ import java.util.concurrent.CompletableFuture;
@@ -23,10 +23,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Function ;
import io.reactivex.BackpressureStrategy ;
import io.reactivex.Completable ;
import io.reactivex.Flowable ;
import io.reactivex.Maybe ;
import io.reactivex.Observable ;
import org.reactivestreams.Publisher ;
import reactor.core.publisher.Flux ;
import reactor.core.publisher.Mono ;
@ -34,6 +30,11 @@ import rx.RxReactiveStreams;
@@ -34,6 +30,11 @@ import rx.RxReactiveStreams;
import org.springframework.util.ClassUtils ;
import static org.springframework.core.ReactiveTypeDescriptor.multiValue ;
import static org.springframework.core.ReactiveTypeDescriptor.noValue ;
import static org.springframework.core.ReactiveTypeDescriptor.singleOptionalValue ;
import static org.springframework.core.ReactiveTypeDescriptor.singleRequiredValue ;
/ * *
* A registry of adapters to adapt a Reactive Streams { @link Publisher } to / from
* various async / reactive types such as { @code CompletableFuture } , RxJava
@ -135,21 +136,21 @@ public class ReactiveAdapterRegistry {
@@ -135,21 +136,21 @@ public class ReactiveAdapterRegistry {
// Flux and Mono ahead of Publisher...
registry . registerReactiveType (
ReactiveTypeDescriptor . singleOptionalValue ( Mono . class , Mono : : empty ) ,
singleOptionalValue ( Mono . class , Mono : : empty ) ,
source - > ( Mono < ? > ) source ,
Mono : : from
) ;
registry . registerReactiveType ( ReactiveTypeDescriptor . multiValue ( Flux . class , Flux : : empty ) ,
registry . registerReactiveType ( multiValue ( Flux . class , Flux : : empty ) ,
source - > ( Flux < ? > ) source ,
Flux : : from ) ;
registry . registerReactiveType ( ReactiveTypeDescriptor . multiValue ( Publisher . class , Flux : : empty ) ,
registry . registerReactiveType ( multiValue ( Publisher . class , Flux : : empty ) ,
source - > ( Publisher < ? > ) source ,
source - > source ) ;
registry . registerReactiveType (
ReactiveTypeDescriptor . singleOptionalValue ( CompletableFuture . class , ( ) - > {
singleOptionalValue ( CompletableFuture . class , ( ) - > {
CompletableFuture < ? > empty = new CompletableFuture < > ( ) ;
empty . complete ( null ) ;
return empty ;
@ -164,17 +165,17 @@ public class ReactiveAdapterRegistry {
@@ -164,17 +165,17 @@ public class ReactiveAdapterRegistry {
void registerAdapters ( ReactiveAdapterRegistry registry ) {
registry . registerReactiveType (
ReactiveTypeDescriptor . multiValue ( rx . Observable . class , rx . Observable : : empty ) ,
multiValue ( rx . Observable . class , rx . Observable : : empty ) ,
source - > RxReactiveStreams . toPublisher ( ( rx . Observable < ? > ) source ) ,
RxReactiveStreams : : toObservable
) ;
registry . registerReactiveType (
ReactiveTypeDescriptor . singleRequiredValue ( rx . Single . class ) ,
singleRequiredValue ( rx . Single . class ) ,
source - > RxReactiveStreams . toPublisher ( ( rx . Single < ? > ) source ) ,
RxReactiveStreams : : toSingle
) ;
registry . registerReactiveType (
ReactiveTypeDescriptor . noValue ( rx . Completable . class , Completable : : complete ) ,
noValue ( rx . Completable . class , rx . Completable : : complete ) ,
source - > RxReactiveStreams . toPublisher ( ( rx . Completable ) source ) ,
RxReactiveStreams : : toCompletable
) ;
@ -185,29 +186,29 @@ public class ReactiveAdapterRegistry {
@@ -185,29 +186,29 @@ public class ReactiveAdapterRegistry {
void registerAdapters ( ReactiveAdapterRegistry registry ) {
registry . registerReactiveType (
ReactiveTypeDescriptor . multiValue ( Flowable . class , Flowable : : empty ) ,
source - > ( Flowable < ? > ) source ,
source - > Flowable . fromPublisher ( source )
multiValue ( io . reactivex . Flowable . class , io . reactivex . Flowable : : empty ) ,
source - > ( io . reactivex . Flowable < ? > ) source ,
source - > io . reactivex . Flowable . fromPublisher ( source )
) ;
registry . registerReactiveType (
ReactiveTypeDescriptor . multiValue ( Observable . class , Observable : : empty ) ,
source - > ( ( Observable < ? > ) source ) . toFlowable ( BackpressureStrategy . BUFFER ) ,
source - > Flowable . fromPublisher ( source ) . toObservable ( )
multiValue ( io . reactivex . Observable . class , io . reactivex . Observable : : empty ) ,
source - > ( ( io . reactivex . Observable < ? > ) source ) . toFlowable ( BackpressureStrategy . BUFFER ) ,
source - > io . reactivex . Flowable . fromPublisher ( source ) . toObservable ( )
) ;
registry . registerReactiveType (
ReactiveTypeDescriptor . singleRequiredValue ( io . reactivex . Single . class ) ,
singleRequiredValue ( io . reactivex . Single . class ) ,
source - > ( ( io . reactivex . Single < ? > ) source ) . toFlowable ( ) ,
source - > Flowable . fromPublisher ( source ) . toObservable ( ) . singleElement ( ) . toSingle ( )
source - > io . reactivex . Flowable . fromPublisher ( source ) . toObservable ( ) . singleElement ( ) . toSingle ( )
) ;
registry . registerReactiveType (
ReactiveTypeDescriptor . singleOptionalValue ( Maybe . class , Maybe : : empty ) ,
source - > ( ( Maybe < ? > ) source ) . toFlowable ( ) ,
source - > Flowable . fromPublisher ( source ) . toObservable ( ) . singleElement ( )
singleOptionalValue ( io . reactivex . Maybe . class , io . reactivex . Maybe : : empty ) ,
source - > ( ( io . reactivex . Maybe < ? > ) source ) . toFlowable ( ) ,
source - > io . reactivex . Flowable . fromPublisher ( source ) . toObservable ( ) . singleElement ( )
) ;
registry . registerReactiveType (
ReactiveTypeDescriptor . noValue ( Completable . class , Completable : : complete ) ,
source - > ( ( Completable ) source ) . toFlowable ( ) ,
source - > Flowable . fromPublisher ( source ) . toObservable ( ) . ignoreElements ( )
noValue ( io . reactivex . Completable . class , io . reactivex . Completable : : complete ) ,
source - > ( ( io . reactivex . Completable ) source ) . toFlowable ( ) ,
source - > io . reactivex . Flowable . fromPublisher ( source ) . toObservable ( ) . ignoreElements ( )
) ;
}
}