From 928c5414014ef5f812198422c0e4be03cad936ce Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 9 Oct 2018 15:22:19 -0400 Subject: [PATCH] Simplify MonoToListenableFutureAdapter Collapse the package private AbstractMonoToListenableFutureAdapter into its only sub-class MonoToListenableFutureAdapter. There is no need for such an abstract class that makes it possible to adapt from one source to a different target type. That's already covered by ListenableFutureAdapter. Follow-up refactoring for SPR-17336. --- ...AbstractMonoToListenableFutureAdapter.java | 121 ------------------ 1 file changed, 121 deletions(-) delete mode 100644 spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java deleted file mode 100644 index c093c75c7b..0000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/AbstractMonoToListenableFutureAdapter.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Copyright 2002-2018 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.support; - -import java.time.Duration; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import reactor.core.publisher.Mono; -import reactor.core.publisher.MonoProcessor; - -import org.springframework.lang.Nullable; -import org.springframework.util.Assert; -import org.springframework.util.concurrent.FailureCallback; -import org.springframework.util.concurrent.ListenableFuture; -import org.springframework.util.concurrent.ListenableFutureCallback; -import org.springframework.util.concurrent.ListenableFutureCallbackRegistry; -import org.springframework.util.concurrent.SuccessCallback; - -/** - * Adapts {@link Mono} to {@link ListenableFuture} optionally converting the - * result Object type {@code } to the expected target type {@code }. - * - * @author Rossen Stoyanchev - * @since 5.0 - * @param the type of object expected from the {@link Mono} - * @param the type of object expected from the {@link ListenableFuture} - */ -abstract class AbstractMonoToListenableFutureAdapter implements ListenableFuture { - - private final MonoProcessor monoProcessor; - - private final ListenableFutureCallbackRegistry registry = new ListenableFutureCallbackRegistry<>(); - - - protected AbstractMonoToListenableFutureAdapter(Mono mono) { - Assert.notNull(mono, "Mono must not be null"); - this.monoProcessor = mono - .doOnSuccess(result -> { - T adapted; - try { - adapted = adapt(result); - } - catch (Throwable ex) { - this.registry.failure(ex); - return; - } - this.registry.success(adapted); - }) - .doOnError(this.registry::failure) - .toProcessor(); - } - - - @Override - @Nullable - public T get() throws InterruptedException { - S result = this.monoProcessor.block(); - return adapt(result); - } - - @Override - @Nullable - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - Assert.notNull(unit, "TimeUnit must not be null"); - Duration duration = Duration.ofMillis(TimeUnit.MILLISECONDS.convert(timeout, unit)); - S result = this.monoProcessor.block(duration); - return adapt(result); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (isCancelled()) { - return false; - } - this.monoProcessor.cancel(); - // isCancelled may still return false, if mono completed before the cancel - return this.monoProcessor.isCancelled(); - } - - @Override - public boolean isCancelled() { - return this.monoProcessor.isCancelled(); - } - - @Override - public boolean isDone() { - return this.monoProcessor.isTerminated(); - } - - @Override - public void addCallback(ListenableFutureCallback callback) { - this.registry.addCallback(callback); - } - - @Override - public void addCallback(SuccessCallback successCallback, FailureCallback failureCallback) { - this.registry.addSuccessCallback(successCallback); - this.registry.addFailureCallback(failureCallback); - } - - - @Nullable - protected abstract T adapt(@Nullable S result); - -}