|
|
|
@ -24,6 +24,8 @@ import org.reactivestreams.Publisher;
@@ -24,6 +24,8 @@ import org.reactivestreams.Publisher;
|
|
|
|
|
import org.reactivestreams.Subscriber; |
|
|
|
|
import org.reactivestreams.Subscription; |
|
|
|
|
import reactor.core.error.Exceptions; |
|
|
|
|
import reactor.core.error.SpecificationExceptions; |
|
|
|
|
import reactor.core.support.BackpressureUtils; |
|
|
|
|
import reactor.rx.Stream; |
|
|
|
|
import reactor.rx.action.Action; |
|
|
|
|
import reactor.rx.subscription.ReactiveSubscription; |
|
|
|
@ -111,15 +113,19 @@ public class CompletableFutureUtils {
@@ -111,15 +113,19 @@ public class CompletableFutureUtils {
|
|
|
|
|
|
|
|
|
|
@Override |
|
|
|
|
public void request(long elements) { |
|
|
|
|
Action.checkRequest(elements); |
|
|
|
|
try{ |
|
|
|
|
BackpressureUtils.checkRequest(elements); |
|
|
|
|
}catch(SpecificationExceptions.Spec309_NullOrNegativeRequest iae){ |
|
|
|
|
subscriber.onError(iae); |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
if (isComplete()) return; |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
future.whenComplete((result, error) -> { |
|
|
|
|
if (error != null) { |
|
|
|
|
onError(error); |
|
|
|
|
} |
|
|
|
|
else { |
|
|
|
|
} else { |
|
|
|
|
subscriber.onNext(result); |
|
|
|
|
onComplete(); |
|
|
|
|
} |
|
|
|
|