Browse Source

Polishing contribution

See gh-27331
pull/27379/head
Rossen Stoyanchev 3 years ago
parent
commit
41ab268733
  1. 21
      spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java
  2. 40
      spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java

21
spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

@ -24,8 +24,6 @@ import java.util.concurrent.CompletableFuture; @@ -24,8 +24,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import kotlinx.coroutines.CompletableDeferredKt;
import kotlinx.coroutines.Deferred;
import org.reactivestreams.Publisher;
@ -85,7 +83,7 @@ public class ReactiveAdapterRegistry { @@ -85,7 +83,7 @@ public class ReactiveAdapterRegistry {
rxjava3Present = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader);
flowPublisherPresent = ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader);
kotlinCoroutinesPresent = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader);
mutinyPresent = ClassUtils.isPresent("io.smallrye.mutiny.Uni", classLoader);
mutinyPresent = ClassUtils.isPresent("io.smallrye.mutiny.Multi", classLoader);
}
private final List<ReactiveAdapter> adapters = new ArrayList<>();
@ -427,19 +425,24 @@ public class ReactiveAdapterRegistry { @@ -427,19 +425,24 @@ public class ReactiveAdapterRegistry {
}
}
private static class MutinyRegistrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.singleOptionalValue(Uni.class, ()-> Uni.createFrom().nothing()),
uni ->((Uni<?>)uni).convert().toPublisher(),
publisher -> Uni.createFrom().publisher(publisher)
ReactiveTypeDescriptor.singleOptionalValue(
io.smallrye.mutiny.Uni.class,
() -> io.smallrye.mutiny.Uni.createFrom().nothing()),
uni -> ((io.smallrye.mutiny.Uni<?>) uni).convert().toPublisher(),
publisher -> io.smallrye.mutiny.Uni.createFrom().publisher(publisher)
);
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Multi.class, ()-> Multi.createFrom().empty()),
multi -> (Multi<?>) multi,
publisher-> Multi.createFrom().publisher(publisher)
ReactiveTypeDescriptor.multiValue(
io.smallrye.mutiny.Multi.class,
() -> io.smallrye.mutiny.Multi.createFrom().empty()),
multi -> (io.smallrye.mutiny.Multi<?>) multi,
publisher -> io.smallrye.mutiny.Multi.createFrom().publisher(publisher)
);
}
}

40
spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java

@ -40,6 +40,9 @@ import static org.assertj.core.api.Assertions.assertThat; @@ -40,6 +40,9 @@ import static org.assertj.core.api.Assertions.assertThat;
@SuppressWarnings("unchecked")
class ReactiveAdapterRegistryTests {
private static final Duration ONE_SECOND = Duration.ofSeconds(1);
private final ReactiveAdapterRegistry registry = ReactiveAdapterRegistry.getSharedInstance();
@ -85,7 +88,7 @@ class ReactiveAdapterRegistryTests { @@ -85,7 +88,7 @@ class ReactiveAdapterRegistryTests {
Publisher<Integer> source = io.reactivex.rxjava3.core.Flowable.fromIterable(sequence);
Object target = getAdapter(Flux.class).fromPublisher(source);
assertThat(target instanceof Flux).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}
@Test
@ -93,7 +96,7 @@ class ReactiveAdapterRegistryTests { @@ -93,7 +96,7 @@ class ReactiveAdapterRegistryTests {
Publisher<Integer> source = io.reactivex.rxjava3.core.Flowable.fromArray(1, 2, 3);
Object target = getAdapter(Mono.class).fromPublisher(source);
assertThat(target instanceof Mono).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}
@Test
@ -110,7 +113,7 @@ class ReactiveAdapterRegistryTests { @@ -110,7 +113,7 @@ class ReactiveAdapterRegistryTests {
future.complete(1);
Object target = getAdapter(CompletableFuture.class).toPublisher(future);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}
}
@ -155,7 +158,7 @@ class ReactiveAdapterRegistryTests { @@ -155,7 +158,7 @@ class ReactiveAdapterRegistryTests {
Object source = rx.Observable.from(sequence);
Object target = getAdapter(rx.Observable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}
@Test
@ -163,7 +166,7 @@ class ReactiveAdapterRegistryTests { @@ -163,7 +166,7 @@ class ReactiveAdapterRegistryTests {
Object source = rx.Single.just(1);
Object target = getAdapter(rx.Single.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}
@Test
@ -171,7 +174,7 @@ class ReactiveAdapterRegistryTests { @@ -171,7 +174,7 @@ class ReactiveAdapterRegistryTests {
Object source = rx.Completable.complete();
Object target = getAdapter(rx.Completable.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
((Mono<Void>) target).block(Duration.ofMillis(1000));
((Mono<Void>) target).block(ONE_SECOND);
}
}
@ -229,7 +232,7 @@ class ReactiveAdapterRegistryTests { @@ -229,7 +232,7 @@ class ReactiveAdapterRegistryTests {
Object source = io.reactivex.Flowable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Flowable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}
@Test
@ -238,7 +241,7 @@ class ReactiveAdapterRegistryTests { @@ -238,7 +241,7 @@ class ReactiveAdapterRegistryTests {
Object source = io.reactivex.Observable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.Observable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}
@Test
@ -246,7 +249,7 @@ class ReactiveAdapterRegistryTests { @@ -246,7 +249,7 @@ class ReactiveAdapterRegistryTests {
Object source = io.reactivex.Single.just(1);
Object target = getAdapter(io.reactivex.Single.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}
@Test
@ -254,7 +257,7 @@ class ReactiveAdapterRegistryTests { @@ -254,7 +257,7 @@ class ReactiveAdapterRegistryTests {
Object source = io.reactivex.Completable.complete();
Object target = getAdapter(io.reactivex.Completable.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
((Mono<Void>) target).block(Duration.ofMillis(1000));
((Mono<Void>) target).block(ONE_SECOND);
}
}
@ -312,7 +315,7 @@ class ReactiveAdapterRegistryTests { @@ -312,7 +315,7 @@ class ReactiveAdapterRegistryTests {
Object source = io.reactivex.rxjava3.core.Flowable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.rxjava3.core.Flowable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}
@Test
@ -321,7 +324,7 @@ class ReactiveAdapterRegistryTests { @@ -321,7 +324,7 @@ class ReactiveAdapterRegistryTests {
Object source = io.reactivex.rxjava3.core.Observable.fromIterable(sequence);
Object target = getAdapter(io.reactivex.rxjava3.core.Observable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(Duration.ofMillis(1000))).isEqualTo(sequence);
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}
@Test
@ -329,7 +332,7 @@ class ReactiveAdapterRegistryTests { @@ -329,7 +332,7 @@ class ReactiveAdapterRegistryTests {
Object source = io.reactivex.rxjava3.core.Single.just(1);
Object target = getAdapter(io.reactivex.rxjava3.core.Single.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}
@Test
@ -337,7 +340,7 @@ class ReactiveAdapterRegistryTests { @@ -337,7 +340,7 @@ class ReactiveAdapterRegistryTests {
Object source = io.reactivex.rxjava3.core.Completable.complete();
Object target = getAdapter(io.reactivex.rxjava3.core.Completable.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
((Mono<Void>) target).block(Duration.ofMillis(1000));
((Mono<Void>) target).block(ONE_SECOND);
}
}
@ -359,7 +362,6 @@ class ReactiveAdapterRegistryTests { @@ -359,7 +362,6 @@ class ReactiveAdapterRegistryTests {
}
}
// SmallRye Mutiny
@Nested
class Mutiny {
@ -374,7 +376,7 @@ class ReactiveAdapterRegistryTests { @@ -374,7 +376,7 @@ class ReactiveAdapterRegistryTests {
Publisher<Integer> source = Mono.just(1);
Object target = getAdapter(Uni.class).fromPublisher(source);
assertThat(target).isInstanceOf(Uni.class);
assertThat(((Uni<Integer>) target).await().atMost(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
assertThat(((Uni<Integer>) target).await().atMost(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}
@Test
@ -382,7 +384,7 @@ class ReactiveAdapterRegistryTests { @@ -382,7 +384,7 @@ class ReactiveAdapterRegistryTests {
Uni<Integer> source = Uni.createFrom().item(1);
Object target = getAdapter(Uni.class).toPublisher(source);
assertThat(target).isInstanceOf(Mono.class);
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}
@Test
@ -391,7 +393,7 @@ class ReactiveAdapterRegistryTests { @@ -391,7 +393,7 @@ class ReactiveAdapterRegistryTests {
Publisher<Integer> source = Flux.fromIterable(sequence);
Object target = getAdapter(Multi.class).fromPublisher(source);
assertThat(target).isInstanceOf(Multi.class);
assertThat(((Multi<Integer>) target).collect().asList().await().atMost(Duration.ofMillis(1000))).isEqualTo(sequence);
assertThat(((Multi<Integer>) target).collect().asList().await().atMost(ONE_SECOND)).isEqualTo(sequence);
}
@Test
@ -400,7 +402,7 @@ class ReactiveAdapterRegistryTests { @@ -400,7 +402,7 @@ class ReactiveAdapterRegistryTests {
Multi<Integer> source = Multi.createFrom().iterable(sequence);
Object target = getAdapter(Multi.class).toPublisher(source);
assertThat(target).isInstanceOf(Flux.class);
assertThat(((Flux<Integer>) target).blockLast(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(3));
assertThat(((Flux<Integer>) target).blockLast(ONE_SECOND)).isEqualTo(Integer.valueOf(3));
}
}

Loading…
Cancel
Save