|
|
|
@ -280,22 +280,25 @@ public class ReactiveAdapterRegistry {
@@ -280,22 +280,25 @@ public class ReactiveAdapterRegistry {
|
|
|
|
|
private static class ReactorJdkFlowAdapterRegistrar { |
|
|
|
|
|
|
|
|
|
void registerAdapter(ReactiveAdapterRegistry registry) throws Exception { |
|
|
|
|
|
|
|
|
|
// TODO: remove reflection when build requires JDK 9+
|
|
|
|
|
Class<?> type = ClassUtils.forName("java.util.concurrent.Flow.Publisher", getClass().getClassLoader()); |
|
|
|
|
Method toFluxMethod = getMethod("flowPublisherToFlux", type); |
|
|
|
|
Method toFlowMethod = getMethod("publisherToFlowPublisher", Publisher.class); |
|
|
|
|
|
|
|
|
|
String publisherName = "java.util.concurrent.Flow.Publisher"; |
|
|
|
|
Class<?> publisherClass = ClassUtils.forName(publisherName, getClass().getClassLoader()); |
|
|
|
|
|
|
|
|
|
String adapterName = "reactor.adapter.JdkFlowAdapter"; |
|
|
|
|
Class<?> flowAdapterClass = ClassUtils.forName(adapterName, getClass().getClassLoader()); |
|
|
|
|
|
|
|
|
|
Method toFluxMethod = flowAdapterClass.getMethod("flowPublisherToFlux", publisherClass); |
|
|
|
|
Method toFlowMethod = flowAdapterClass.getMethod("publisherToFlowPublisher", Publisher.class); |
|
|
|
|
Object emptyFlow = ReflectionUtils.invokeMethod(toFlowMethod, null, Flux.empty()); |
|
|
|
|
|
|
|
|
|
registry.registerReactiveType( |
|
|
|
|
ReactiveTypeDescriptor.multiValue(type, () -> emptyFlow), |
|
|
|
|
ReactiveTypeDescriptor.multiValue(publisherClass, () -> emptyFlow), |
|
|
|
|
source -> (Publisher<?>) ReflectionUtils.invokeMethod(toFluxMethod, null, source), |
|
|
|
|
publisher -> ReflectionUtils.invokeMethod(toFlowMethod, null, publisher) |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private static Method getMethod(String name, Class<?> argumentType) throws NoSuchMethodException { |
|
|
|
|
return reactor.adapter.JdkFlowAdapter.class.getMethod(name, argumentType); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|