diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java index f30d9d4485..240a77e4ba 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java @@ -67,99 +67,81 @@ public class ReactiveAdapterRegistry { // Flux and Mono ahead of Publisher... - registerMonoAdapter(Mono.class, - source -> (Mono) source, source -> source, - ReactiveTypeDescriptor.singleOptionalValue(Mono.class)); + registerReactiveType( + ReactiveTypeDescriptor.singleOptionalValue(Mono.class), + source -> (Mono) source, + source -> source + ); - registerFluxAdapter(Flux.class, - source -> (Flux) source, source -> source); + registerReactiveType(ReactiveTypeDescriptor.multiValue(Flux.class), + source -> (Flux) source, + source -> source); - registerFluxAdapter(Publisher.class, - source -> Flux.from((Publisher) source), source -> source); + registerReactiveType(ReactiveTypeDescriptor.multiValue(Publisher.class), + source -> Flux.from((Publisher) source), + source -> source); - registerMonoAdapter(CompletableFuture.class, - source -> Mono.fromFuture((CompletableFuture) source), Mono::toFuture, - ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class) + registerReactiveType( + ReactiveTypeDescriptor.singleOptionalValue(CompletableFuture.class), + source -> Mono.fromFuture((CompletableFuture) source), + source -> Mono.from(source).toFuture() ); if (rxJava1Present && rxJava1Adapter) { - new RxJava1AdapterRegistrar().register(this); + new RxJava1Registrar().registerAdapters(this); } if (rxJava2Present) { - new RxJava2AdapterRegistrar().register(this); + new RxJava2Registrar().registerAdapters(this); } } /** - * Register an adapter for adapting to and from a {@link Mono}. - *

The provided functions can assume that input will never be {@code null} - * and also that any {@link Optional} wrapper is unwrapped. + * Register a reactive type along with functions to adapt to and from a + * Reactive Streams {@link Publisher}. The functions can assume their + * input is never be {@code null} nor {@link Optional}. */ - public void registerMonoAdapter(Class reactiveType, Function> toAdapter, - Function, Object> fromAdapter, ReactiveTypeDescriptor descriptor) { + public void registerReactiveType(ReactiveTypeDescriptor descriptor, + Function> toAdapter, Function, Object> fromAdapter) { - this.adapters.add(new MonoReactiveAdapter(toAdapter, fromAdapter, descriptor)); - } + ReactiveAdapter adapter = (descriptor.isMultiValue() ? + new FluxReactiveAdapter(toAdapter, fromAdapter, descriptor) : + new MonoReactiveAdapter(toAdapter, fromAdapter, descriptor)); - /** - * Register an adapter for adapting to and from a {@link Flux}. - *

The provided functions can assume that input will never be {@code null} - * and also that any {@link Optional} wrapper is unwrapped. - */ - public void registerFluxAdapter(Class reactiveType, Function> toAdapter, - Function, Object> fromAdapter) { - - this.adapters.add(new FluxReactiveAdapter(toAdapter, fromAdapter, - ReactiveTypeDescriptor.multiValue(reactiveType))); + this.adapters.add(adapter); } - /** - * Get the adapter for the given reactive type to adapt from. + * Get the adapter to use to adapt from the given reactive type. */ public ReactiveAdapter getAdapterFrom(Class reactiveType) { return getAdapterFrom(reactiveType, null); } /** - * Get the adapter for the given reactive type to adapt from. - * If the instance is not {@code null} its actual type is used to check. + * Get the adapter to use to adapt from the given reactive type. Or if the + * "source" object is not {@code null} its actual type is used instead. */ - public ReactiveAdapter getAdapterFrom(Class reactiveType, Object adaptee) { - Class actualType = getActualType(reactiveType, adaptee); - return getAdapterInternal(supportedType -> supportedType.isAssignableFrom(actualType)); + public ReactiveAdapter getAdapterFrom(Class reactiveType, Object source) { + source = unwrapOptional(source); + Class clazz = (source != null ? source.getClass() : reactiveType); + return getAdapter(type -> type.isAssignableFrom(clazz)); } /** * Get the adapter for the given reactive type to adapt to. */ public ReactiveAdapter getAdapterTo(Class reactiveType) { - return getAdapterTo(reactiveType, null); - } - - /** - * Get the adapter for the given reactive type to adapt to. - * If the instance is not {@code null} its actual type is used to check. - */ - public ReactiveAdapter getAdapterTo(Class reactiveType, Object adaptee) { - Class actualType = getActualType(reactiveType, adaptee); - return getAdapterInternal(supportedType -> supportedType.equals(actualType)); + return getAdapter(reactiveType::equals); } - private ReactiveAdapter getAdapterInternal(Predicate> predicate) { + private ReactiveAdapter getAdapter(Predicate> predicate) { return this.adapters.stream() .filter(adapter -> predicate.test(adapter.getDescriptor().getReactiveType())) .findFirst() .orElse(null); } - - private static Class getActualType(Class reactiveType, Object adaptee) { - adaptee = unwrapOptional(adaptee); - return (adaptee != null ? adaptee.getClass() : reactiveType); - } - private static Object unwrapOptional(Object value) { return (value instanceof Optional ? ((Optional) value).orElse(null) : value); } @@ -168,14 +150,14 @@ public class ReactiveAdapterRegistry { @SuppressWarnings("unchecked") private static class MonoReactiveAdapter implements ReactiveAdapter { - private final Function> toAdapter; + private final Function> toAdapter; - private final Function, Object> fromAdapter; + private final Function, Object> fromAdapter; private final ReactiveTypeDescriptor descriptor; - MonoReactiveAdapter(Function> to, Function, Object> from, + MonoReactiveAdapter(Function> to, Function, Object> from, ReactiveTypeDescriptor descriptor) { this.toAdapter = to; @@ -194,7 +176,7 @@ public class ReactiveAdapterRegistry { if (source == null) { return Mono.empty(); } - return (Mono) this.toAdapter.apply(source); + return (Mono) Mono.from(this.toAdapter.apply(source)); } @Override @@ -203,7 +185,7 @@ public class ReactiveAdapterRegistry { if (source == null) { return Flux.empty(); } - return (Flux) this.toMono(source).flux(); + return (Flux) toMono(source).flux(); } @Override @@ -213,21 +195,21 @@ public class ReactiveAdapterRegistry { @Override public Object fromPublisher(Publisher source) { - return (source != null ? this.fromAdapter.apply((Mono) source) : null); + return (source != null ? this.fromAdapter.apply(source) : null); } } @SuppressWarnings("unchecked") private static class FluxReactiveAdapter implements ReactiveAdapter { - private final Function> toAdapter; + private final Function> toAdapter; - private final Function, Object> fromAdapter; + private final Function, Object> fromAdapter; private final ReactiveTypeDescriptor descriptor; - FluxReactiveAdapter(Function> to, Function, Object> from, + FluxReactiveAdapter(Function> to, Function, Object> from, ReactiveTypeDescriptor descriptor) { this.descriptor = descriptor; @@ -246,7 +228,7 @@ public class ReactiveAdapterRegistry { if (source == null) { return Mono.empty(); } - return (Mono) this.toAdapter.apply(source).next(); + return (Mono) toFlux(source).next(); } @Override @@ -255,7 +237,7 @@ public class ReactiveAdapterRegistry { if (source == null) { return Flux.empty(); } - return (Flux) this.toAdapter.apply(source); + return (Flux) Flux.from(this.toAdapter.apply(source)); } @Override @@ -265,56 +247,59 @@ public class ReactiveAdapterRegistry { @Override public Object fromPublisher(Publisher source) { - return (source != null ? this.fromAdapter.apply((Flux) source) : null); + return (source != null ? this.fromAdapter.apply(source) : null); } } - private static class RxJava1AdapterRegistrar { + private static class RxJava1Registrar { - public void register(ReactiveAdapterRegistry registry) { - registry.registerFluxAdapter(rx.Observable.class, + public void registerAdapters(ReactiveAdapterRegistry registry) { + registry.registerReactiveType( + ReactiveTypeDescriptor.multiValue(rx.Observable.class), source -> Flux.from(RxReactiveStreams.toPublisher((rx.Observable) source)), RxReactiveStreams::toObservable ); - registry.registerMonoAdapter(rx.Single.class, + registry.registerReactiveType( + ReactiveTypeDescriptor.singleRequiredValue(rx.Single.class), source -> Mono.from(RxReactiveStreams.toPublisher((rx.Single) source)), - RxReactiveStreams::toSingle, - ReactiveTypeDescriptor.singleRequiredValue(rx.Single.class) + RxReactiveStreams::toSingle ); - registry.registerMonoAdapter(rx.Completable.class, + registry.registerReactiveType( + ReactiveTypeDescriptor.noValue(rx.Completable.class), source -> Mono.from(RxReactiveStreams.toPublisher((rx.Completable) source)), - RxReactiveStreams::toCompletable, - ReactiveTypeDescriptor.noValue(rx.Completable.class) + RxReactiveStreams::toCompletable ); } } - private static class RxJava2AdapterRegistrar { + private static class RxJava2Registrar { - public void register(ReactiveAdapterRegistry registry) { - registry.registerFluxAdapter(Flowable.class, + public void registerAdapters(ReactiveAdapterRegistry registry) { + registry.registerReactiveType( + ReactiveTypeDescriptor.multiValue(Flowable.class), source -> Flux.from((Flowable) source), source-> Flowable.fromPublisher(source) ); - registry.registerFluxAdapter(io.reactivex.Observable.class, + registry.registerReactiveType( + ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class), source -> Flux.from(((io.reactivex.Observable) source).toFlowable(BackpressureStrategy.BUFFER)), source -> Flowable.fromPublisher(source).toObservable() ); - registry.registerMonoAdapter(io.reactivex.Single.class, + registry.registerReactiveType( + ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class), source -> Mono.from(((io.reactivex.Single) source).toFlowable()), - source -> Flowable.fromPublisher(source).toObservable().singleElement().toSingle(), - ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class) + source -> Flowable.fromPublisher(source).toObservable().singleElement().toSingle() ); - registry.registerMonoAdapter(Maybe.class, + registry.registerReactiveType( + ReactiveTypeDescriptor.singleOptionalValue(Maybe.class), source -> Mono.from(((Maybe) source).toFlowable()), - source -> Flowable.fromPublisher(source).toObservable().singleElement(), - ReactiveTypeDescriptor.singleOptionalValue(Maybe.class) + source -> Flowable.fromPublisher(source).toObservable().singleElement() ); - registry.registerMonoAdapter(io.reactivex.Completable.class, + registry.registerReactiveType( + ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class), source -> Mono.from(((io.reactivex.Completable) source).toFlowable()), - source -> Flowable.fromPublisher(source).toObservable().ignoreElements(), - ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class) + source -> Flowable.fromPublisher(source).toObservable().ignoreElements() ); } } diff --git a/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java b/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java index 07e0be2a0e..4808ac93b4 100644 --- a/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java +++ b/spring-core/src/main/java/org/springframework/core/ReactiveTypeDescriptor.java @@ -85,6 +85,23 @@ public class ReactiveTypeDescriptor { } + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + return this.reactiveType.equals(((ReactiveTypeDescriptor) other).reactiveType); + } + + @Override + public int hashCode() { + return this.reactiveType.hashCode(); + } + + /** * Descriptor for a reactive type that can produce 0..N values. */