Browse Source

Remove support for RxJava 1.x

Also removes unnecessary flowPublisherPresent check on JDK 9+, depending on Reactor presence for its JDK Flow adapter instead.

Closes gh-27443
pull/23432/head
Juergen Hoeller 3 years ago
parent
commit
3beb074278
  1. 2
      build.gradle
  2. 2
      spring-core/spring-core.gradle
  3. 66
      spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java
  4. 64
      spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java

2
build.gradle

@ -64,8 +64,6 @@ configure(allprojects) { project -> @@ -64,8 +64,6 @@ configure(allprojects) { project ->
entry 'groovy-xml'
}
dependency "io.reactivex:rxjava:1.3.8"
dependency "io.reactivex:rxjava-reactive-streams:1.2.1"
dependency "io.reactivex.rxjava2:rxjava:2.2.21"
dependency "io.reactivex.rxjava3:rxjava:3.1.1"
dependency "io.smallrye.reactive:mutiny:1.0.0"

2
spring-core/spring-core.gradle

@ -44,8 +44,6 @@ dependencies { @@ -44,8 +44,6 @@ dependencies {
optional("org.jetbrains.kotlinx:kotlinx-coroutines-core")
optional("org.jetbrains.kotlinx:kotlinx-coroutines-reactor")
optional("io.projectreactor:reactor-core")
optional("io.reactivex:rxjava")
optional("io.reactivex:rxjava-reactive-streams")
optional("io.reactivex.rxjava2:rxjava")
optional("io.reactivex.rxjava3:rxjava")
optional("io.smallrye.reactive:mutiny")

66
spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

@ -32,7 +32,6 @@ import reactor.blockhound.BlockHound; @@ -32,7 +32,6 @@ import reactor.blockhound.BlockHound;
import reactor.blockhound.integration.BlockHoundIntegration;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import rx.RxReactiveStreams;
import org.springframework.lang.Nullable;
import org.springframework.util.ClassUtils;
@ -44,15 +43,12 @@ import org.springframework.util.ConcurrentReferenceHashMap; @@ -44,15 +43,12 @@ import org.springframework.util.ConcurrentReferenceHashMap;
* {@code Observable}, and others.
*
* <p>By default, depending on classpath availability, adapters are registered
* for Reactor, RxJava 2/3, or RxJava 1 (+ RxJava Reactive Streams bridge),
* {@link CompletableFuture}, Java 9+ {@code Flow.Publisher}, and Kotlin
* Coroutines' {@code Deferred} and {@code Flow}.
*
* <p><strong>Note:</strong> As of Spring Framework 5.3, support for RxJava 1.x
* is deprecated in favor of RxJava 2 and 3.
* for Reactor, RxJava 2/3, {@link CompletableFuture}, {@code Flow.Publisher},
* and Kotlin Coroutines' {@code Deferred} and {@code Flow}.
*
* @author Rossen Stoyanchev
* @author Sebastien Deleuze
* @author Juergen Hoeller
* @since 5.0
*/
public class ReactiveAdapterRegistry {
@ -62,14 +58,10 @@ public class ReactiveAdapterRegistry { @@ -62,14 +58,10 @@ public class ReactiveAdapterRegistry {
private static final boolean reactorPresent;
private static final boolean rxjava1Present;
private static final boolean rxjava2Present;
private static final boolean rxjava3Present;
private static final boolean flowPublisherPresent;
private static final boolean kotlinCoroutinesPresent;
private static final boolean mutinyPresent;
@ -77,11 +69,8 @@ public class ReactiveAdapterRegistry { @@ -77,11 +69,8 @@ public class ReactiveAdapterRegistry {
static {
ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader();
reactorPresent = ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader);
rxjava1Present = ClassUtils.isPresent("rx.Observable", classLoader) &&
ClassUtils.isPresent("rx.RxReactiveStreams", classLoader);
rxjava2Present = ClassUtils.isPresent("io.reactivex.Flowable", classLoader);
rxjava3Present = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader);
flowPublisherPresent = ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader);
kotlinCoroutinesPresent = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader);
mutinyPresent = ClassUtils.isPresent("io.smallrye.mutiny.Multi", classLoader);
}
@ -97,29 +86,17 @@ public class ReactiveAdapterRegistry { @@ -97,29 +86,17 @@ public class ReactiveAdapterRegistry {
// Reactor
if (reactorPresent) {
new ReactorRegistrar().registerAdapters(this);
new ReactorJdkFlowAdapterRegistrar().registerAdapter(this);
}
// RxJava1 (deprecated)
if (rxjava1Present) {
new RxJava1Registrar().registerAdapters(this);
}
// RxJava2
// RxJava
if (rxjava2Present) {
new RxJava2Registrar().registerAdapters(this);
}
// RxJava3
if (rxjava3Present) {
new RxJava3Registrar().registerAdapters(this);
}
// Java 9+ Flow.Publisher
if (flowPublisherPresent) {
new ReactorJdkFlowAdapterRegistrar().registerAdapter(this);
}
// If not present, do nothing for the time being...
// We can fall back on "reactive-streams-flow-bridge" (once released)
// Kotlin Coroutines
if (reactorPresent && kotlinCoroutinesPresent) {
new CoroutinesRegistrar().registerAdapters(this);
@ -253,23 +230,14 @@ public class ReactiveAdapterRegistry { @@ -253,23 +230,14 @@ public class ReactiveAdapterRegistry {
}
private static class RxJava1Registrar {
private static class ReactorJdkFlowAdapterRegistrar {
void registerAdapters(ReactiveAdapterRegistry registry) {
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(rx.Observable.class, rx.Observable::empty),
source -> RxReactiveStreams.toPublisher((rx.Observable<?>) source),
RxReactiveStreams::toObservable
);
registry.registerReactiveType(
ReactiveTypeDescriptor.singleRequiredValue(rx.Single.class),
source -> RxReactiveStreams.toPublisher((rx.Single<?>) source),
RxReactiveStreams::toSingle
);
void registerAdapter(ReactiveAdapterRegistry registry) {
Flow.Publisher<?> emptyFlow = JdkFlowAdapter.publisherToFlowPublisher(Flux.empty());
registry.registerReactiveType(
ReactiveTypeDescriptor.noValue(rx.Completable.class, rx.Completable::complete),
source -> RxReactiveStreams.toPublisher((rx.Completable) source),
RxReactiveStreams::toCompletable
ReactiveTypeDescriptor.multiValue(Flow.Publisher.class, () -> emptyFlow),
source -> JdkFlowAdapter.flowPublisherToFlux((Flow.Publisher<?>) source),
JdkFlowAdapter::publisherToFlowPublisher
);
}
}
@ -347,18 +315,6 @@ public class ReactiveAdapterRegistry { @@ -347,18 +315,6 @@ public class ReactiveAdapterRegistry {
}
}
private static class ReactorJdkFlowAdapterRegistrar {
void registerAdapter(ReactiveAdapterRegistry registry) {
Flow.Publisher<?> emptyFlow = JdkFlowAdapter.publisherToFlowPublisher(Flux.empty());
registry.registerReactiveType(
ReactiveTypeDescriptor.multiValue(Flow.Publisher.class, () -> emptyFlow),
source -> JdkFlowAdapter.flowPublisherToFlux((Flow.Publisher<?>) source),
JdkFlowAdapter::publisherToFlowPublisher
);
}
}
/**
* ReactiveAdapter variant that wraps adapted Publishers as {@link Flux} or

64
spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -35,6 +35,7 @@ import static org.assertj.core.api.Assertions.assertThat; @@ -35,6 +35,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/**
* Unit tests for {@link ReactiveAdapterRegistry}.
*
* @author Rossen Stoyanchev
*/
@SuppressWarnings("unchecked")
@ -117,67 +118,6 @@ class ReactiveAdapterRegistryTests { @@ -117,67 +118,6 @@ class ReactiveAdapterRegistryTests {
}
}
@Nested
class RxJava1 {
@Test
void defaultAdapterRegistrations() {
assertThat(getAdapter(rx.Observable.class)).isNotNull();
assertThat(getAdapter(rx.Single.class)).isNotNull();
assertThat(getAdapter(rx.Completable.class)).isNotNull();
}
@Test
void toObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Publisher<Integer> source = Flux.fromIterable(sequence);
Object target = getAdapter(rx.Observable.class).fromPublisher(source);
assertThat(target instanceof rx.Observable).isTrue();
assertThat(((rx.Observable<?>) target).toList().toBlocking().first()).isEqualTo(sequence);
}
@Test
void toSingle() {
Publisher<Integer> source = Flux.fromArray(new Integer[] {1});
Object target = getAdapter(rx.Single.class).fromPublisher(source);
assertThat(target instanceof rx.Single).isTrue();
assertThat(((rx.Single<Integer>) target).toBlocking().value()).isEqualTo(Integer.valueOf(1));
}
@Test
void toCompletable() {
Publisher<Integer> source = Flux.fromArray(new Integer[] {1, 2, 3});
Object target = getAdapter(rx.Completable.class).fromPublisher(source);
assertThat(target instanceof rx.Completable).isTrue();
assertThat(((rx.Completable) target).get()).isNull();
}
@Test
void fromObservable() {
List<Integer> sequence = Arrays.asList(1, 2, 3);
Object source = rx.Observable.from(sequence);
Object target = getAdapter(rx.Observable.class).toPublisher(source);
assertThat(target instanceof Flux).as("Expected Flux Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Flux<Integer>) target).collectList().block(ONE_SECOND)).isEqualTo(sequence);
}
@Test
void fromSingle() {
Object source = rx.Single.just(1);
Object target = getAdapter(rx.Single.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
assertThat(((Mono<Integer>) target).block(ONE_SECOND)).isEqualTo(Integer.valueOf(1));
}
@Test
void fromCompletable() {
Object source = rx.Completable.complete();
Object target = getAdapter(rx.Completable.class).toPublisher(source);
assertThat(target instanceof Mono).as("Expected Mono Publisher: " + target.getClass().getName()).isTrue();
((Mono<Void>) target).block(ONE_SECOND);
}
}
@Nested
class RxJava2 {

Loading…
Cancel
Save