Browse Source

ReactiveAdapterRegistry uses Reactor's JdkFlowAdapter

Issue: SPR-16052
pull/1566/head
Rossen Stoyanchev 7 years ago
parent
commit
b4c95bf278
  1. 51
      spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

51
spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
package org.springframework.core;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@ -29,8 +30,13 @@ import reactor.core.publisher.Mono; @@ -29,8 +30,13 @@ import reactor.core.publisher.Mono;
import rx.RxReactiveStreams;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
import org.springframework.util.ReflectionUtils;
import static org.springframework.core.ReactiveTypeDescriptor.*;
import static org.springframework.core.ReactiveTypeDescriptor.multiValue;
import static org.springframework.core.ReactiveTypeDescriptor.noValue;
import static org.springframework.core.ReactiveTypeDescriptor.singleOptionalValue;
import static org.springframework.core.ReactiveTypeDescriptor.singleRequiredValue;
/**
* A registry of adapters to adapt a Reactive Streams {@link Publisher} to/from
@ -38,7 +44,8 @@ import static org.springframework.core.ReactiveTypeDescriptor.*; @@ -38,7 +44,8 @@ import static org.springframework.core.ReactiveTypeDescriptor.*;
* {@code Observable}, and others.
*
* <p>By default, depending on classpath availability, adapters are registered
* for Reactor, RxJava 1, RxJava 2 types, and {@link CompletableFuture}.
* for Reactor, RxJava 1, RxJava 2 types, {@link CompletableFuture}, and Java 9+
* Flow.Publisher.
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
@ -82,6 +89,18 @@ public class ReactiveAdapterRegistry { @@ -82,6 +89,18 @@ public class ReactiveAdapterRegistry {
catch (Throwable ex) {
// Ignore
}
// Java 9+ Flow.Publisher
try {
new ReactorJdkFlowAdapterRegistrar().registerAdapter(this);
}
catch (NoSuchMethodException ex) {
throw new IllegalStateException("Failed to find JdkFlowAdapter methods", ex);
}
catch (Throwable ex) {
// Ignore
// We can fall back on "reactive-streams-flow-bridge" (once released)
}
}
@ -232,6 +251,34 @@ public class ReactiveAdapterRegistry { @@ -232,6 +251,34 @@ public class ReactiveAdapterRegistry {
}
private static class ReactorJdkFlowAdapterRegistrar {
// TODO: remove reflection when build requires JDK 9+
void registerAdapter(ReactiveAdapterRegistry registry)
throws NoSuchMethodException, ClassNotFoundException {
String name = "java.util.concurrent.Flow.Publisher";
Class<?> type = ClassUtils.forName(name, getClass().getClassLoader());
Method toFlowMethod = getMethod("publisherToFlowPublisher", Publisher.class);
Method toFluxMethod = getMethod("flowPublisherToFlux", type);
Object emptyFlow = ReflectionUtils.invokeMethod(toFlowMethod, null, Flux.empty());
registry.registerReactiveType(
multiValue(type, () -> emptyFlow),
source -> (Publisher<?>) ReflectionUtils.invokeMethod(toFluxMethod, null, source),
publisher -> ReflectionUtils.invokeMethod(toFlowMethod, null, publisher)
);
}
private static Method getMethod(String name, Class<?> argumentType) throws NoSuchMethodException {
return reactor.adapter.JdkFlowAdapter.class.getMethod(name, argumentType);
}
}
/**
* Extension of ReactiveAdapter that wraps adapted (raw) Publisher's as
* {@link Flux} or {@link Mono} depending on the underlying reactive type's

Loading…
Cancel
Save