|
|
|
@ -77,11 +77,11 @@ public class ReactiveAdapterRegistry {
@@ -77,11 +77,11 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
static { |
|
|
|
|
ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader(); |
|
|
|
|
reactorPresent = ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader); |
|
|
|
|
flowPublisherPresent = ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader); |
|
|
|
|
rxjava1Present = ClassUtils.isPresent("rx.Observable", classLoader) && |
|
|
|
|
ClassUtils.isPresent("rx.RxReactiveStreams", classLoader); |
|
|
|
|
rxjava2Present = ClassUtils.isPresent("io.reactivex.Flowable", classLoader); |
|
|
|
|
rxjava3Present = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader); |
|
|
|
|
flowPublisherPresent = ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader); |
|
|
|
|
kotlinCoroutinesPresent = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader); |
|
|
|
|
mutinyPresent = ClassUtils.isPresent("io.smallrye.mutiny.Multi", classLoader); |
|
|
|
|
} |
|
|
|
@ -97,13 +97,16 @@ public class ReactiveAdapterRegistry {
@@ -97,13 +97,16 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
// Reactor
|
|
|
|
|
if (reactorPresent) { |
|
|
|
|
new ReactorRegistrar().registerAdapters(this); |
|
|
|
|
if (flowPublisherPresent) { |
|
|
|
|
// Java 9+ Flow.Publisher
|
|
|
|
|
new ReactorJdkFlowAdapterRegistrar().registerAdapter(this); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// RxJava1 (deprecated)
|
|
|
|
|
if (rxjava1Present) { |
|
|
|
|
new RxJava1Registrar().registerAdapters(this); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// RxJava2
|
|
|
|
|
if (rxjava2Present) { |
|
|
|
|
new RxJava2Registrar().registerAdapters(this); |
|
|
|
@ -113,13 +116,6 @@ public class ReactiveAdapterRegistry {
@@ -113,13 +116,6 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
new RxJava3Registrar().registerAdapters(this); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Java 9+ Flow.Publisher
|
|
|
|
|
if (flowPublisherPresent) { |
|
|
|
|
new ReactorJdkFlowAdapterRegistrar().registerAdapter(this); |
|
|
|
|
} |
|
|
|
|
// If not present, do nothing for the time being...
|
|
|
|
|
// We can fall back on "reactive-streams-flow-bridge" (once released)
|
|
|
|
|
|
|
|
|
|
// Kotlin Coroutines
|
|
|
|
|
if (reactorPresent && kotlinCoroutinesPresent) { |
|
|
|
|
new CoroutinesRegistrar().registerAdapters(this); |
|
|
|
@ -253,6 +249,35 @@ public class ReactiveAdapterRegistry {
@@ -253,6 +249,35 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class ReactorJdkFlowAdapterRegistrar { |
|
|
|
|
|
|
|
|
|
void registerAdapter(ReactiveAdapterRegistry registry) { |
|
|
|
|
// Reflectively access optional JDK 9+ API (for runtime compatibility with JDK 8)
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
String publisherName = "java.util.concurrent.Flow.Publisher"; |
|
|
|
|
Class<?> publisherClass = ClassUtils.forName(publisherName, getClass().getClassLoader()); |
|
|
|
|
|
|
|
|
|
String adapterName = "reactor.adapter.JdkFlowAdapter"; |
|
|
|
|
Class<?> flowAdapterClass = ClassUtils.forName(adapterName, getClass().getClassLoader()); |
|
|
|
|
|
|
|
|
|
Method toFluxMethod = flowAdapterClass.getMethod("flowPublisherToFlux", publisherClass); |
|
|
|
|
Method toFlowMethod = flowAdapterClass.getMethod("publisherToFlowPublisher", Publisher.class); |
|
|
|
|
Object emptyFlow = ReflectionUtils.invokeMethod(toFlowMethod, null, Flux.empty()); |
|
|
|
|
|
|
|
|
|
registry.registerReactiveType( |
|
|
|
|
ReactiveTypeDescriptor.multiValue(publisherClass, () -> emptyFlow), |
|
|
|
|
source -> (Publisher<?>) ReflectionUtils.invokeMethod(toFluxMethod, null, source), |
|
|
|
|
publisher -> ReflectionUtils.invokeMethod(toFlowMethod, null, publisher) |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
catch (Throwable ex) { |
|
|
|
|
// Ignore
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class RxJava1Registrar { |
|
|
|
|
|
|
|
|
|
void registerAdapters(ReactiveAdapterRegistry registry) { |
|
|
|
@ -307,6 +332,7 @@ public class ReactiveAdapterRegistry {
@@ -307,6 +332,7 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static class RxJava3Registrar { |
|
|
|
|
|
|
|
|
|
void registerAdapters(ReactiveAdapterRegistry registry) { |
|
|
|
@ -347,34 +373,6 @@ public class ReactiveAdapterRegistry {
@@ -347,34 +373,6 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private static class ReactorJdkFlowAdapterRegistrar { |
|
|
|
|
|
|
|
|
|
void registerAdapter(ReactiveAdapterRegistry registry) { |
|
|
|
|
// TODO: remove reflection when build requires JDK 9+
|
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
String publisherName = "java.util.concurrent.Flow.Publisher"; |
|
|
|
|
Class<?> publisherClass = ClassUtils.forName(publisherName, getClass().getClassLoader()); |
|
|
|
|
|
|
|
|
|
String adapterName = "reactor.adapter.JdkFlowAdapter"; |
|
|
|
|
Class<?> flowAdapterClass = ClassUtils.forName(adapterName, getClass().getClassLoader()); |
|
|
|
|
|
|
|
|
|
Method toFluxMethod = flowAdapterClass.getMethod("flowPublisherToFlux", publisherClass); |
|
|
|
|
Method toFlowMethod = flowAdapterClass.getMethod("publisherToFlowPublisher", Publisher.class); |
|
|
|
|
Object emptyFlow = ReflectionUtils.invokeMethod(toFlowMethod, null, Flux.empty()); |
|
|
|
|
|
|
|
|
|
registry.registerReactiveType( |
|
|
|
|
ReactiveTypeDescriptor.multiValue(publisherClass, () -> emptyFlow), |
|
|
|
|
source -> (Publisher<?>) ReflectionUtils.invokeMethod(toFluxMethod, null, source), |
|
|
|
|
publisher -> ReflectionUtils.invokeMethod(toFlowMethod, null, publisher) |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
catch (Throwable ex) { |
|
|
|
|
// Ignore
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* ReactiveAdapter variant that wraps adapted Publishers as {@link Flux} or |
|
|
|
|