diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java new file mode 100644 index 0000000000..7b37673302 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java @@ -0,0 +1,174 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +import org.springframework.util.Assert; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +/** + * A {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture} + * whose value can be set via {@link #set(Object)} or + * {@link #setException(Throwable)}. It may also be cancelled. + * + *

Inspired by {@code com.google.common.util.concurrent.SettableFuture}. + * + * @author Mattias Severson + * @author Rossen Stoyanchev + * @since 4.1 + */ +public class SettableListenableFuture implements ListenableFuture { + + private final SettableTask settableTask; + + private final ListenableFutureTask listenableFuture; + + + public SettableListenableFuture() { + this.settableTask = new SettableTask(); + this.listenableFuture = new ListenableFutureTask(this.settableTask); + } + + /** + * Set the value of this future. This method will return {@code true} if + * the value was set successfully, or {@code false} if the future has already + * been set or cancelled. + * @param value the value that will be set. + * @return {@code true} if the value was successfully set, else {@code false}. + */ + public boolean set(T value) { + boolean success = this.settableTask.setValue(value); + if (success) { + this.listenableFuture.run(); + } + return success; + } + + /** + * Set the exception of this future. This method will return {@code true} if + * the exception was set successfully, or {@code false} if the future has already + * been set or cancelled. + * @param exception the value that will be set. + * @return {@code true} if the exception was successfully set, else {@code false}. + */ + public boolean setException(Throwable exception) { + Assert.notNull(exception, "'exception' must not be null"); + boolean success = this.settableTask.setValue(exception); + if (success) { + this.listenableFuture.run(); + } + return success; + } + + @Override + public void addCallback(ListenableFutureCallback callback) { + this.listenableFuture.addCallback(callback); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + this.settableTask.setCancelled(); + boolean cancelled = this.listenableFuture.cancel(mayInterruptIfRunning); + if (cancelled && mayInterruptIfRunning) { + interruptTask(); + } + return cancelled; + } + + @Override + public boolean isCancelled() { + return this.listenableFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return this.listenableFuture.isDone(); + } + + /** + * Retrieve the value. + *

Will return the value if it has been set via {@link #set(Object)}, + * throw an {@link java.util.concurrent.ExecutionException} if it has been + * set via {@link #setException(Throwable)} or throw a + * {@link java.util.concurrent.CancellationException} if it has been cancelled. + * @return The value associated with this future. + */ + @Override + public T get() throws InterruptedException, ExecutionException { + return this.listenableFuture.get(); + } + + /** + * Retrieve the value. + *

Will return the value if it has been set via {@link #set(Object)}, + * throw an {@link java.util.concurrent.ExecutionException} if it has been + * set via {@link #setException(Throwable)} or throw a + * {@link java.util.concurrent.CancellationException} if it has been cancelled. + * @param timeout the maximum time to wait. + * @param unit the time unit of the timeout argument. + * @return The value associated with this future. + */ + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return this.listenableFuture.get(timeout, unit); + } + + /** + * Subclasses can override this method to implement interruption of the future's + * computation. The method is invoked automatically by a successful call to + * {@link #cancel(boolean) cancel(true)}. + * + *

The default implementation does nothing. + */ + protected void interruptTask() { + } + + + private static class SettableTask implements Callable { + + private static final String NO_VALUE = SettableListenableFuture.class.getName() + ".NO_VALUE"; + + private final AtomicReference value = new AtomicReference(NO_VALUE); + + private volatile boolean cancelled = false; + + + public boolean setValue(Object value) { + if (this.cancelled) { + return false; + } + return this.value.compareAndSet(NO_VALUE, value); + } + + public void setCancelled() { + this.cancelled = true; + } + + @Override + public T call() throws Exception { + if (value.get() instanceof Exception) { + throw (Exception) value.get(); + } + return (T) value.get(); + } + } + +} diff --git a/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java b/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java new file mode 100644 index 0000000000..d16d08a77f --- /dev/null +++ b/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java @@ -0,0 +1,332 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + + +/** + * @author Mattias Severson + */ +public class SettableListenableFutureTests { + + private SettableListenableFuture settableListenableFuture; + + @Before + public void setUp() { + settableListenableFuture = new SettableListenableFuture(); + } + + @Test + public void validateInitialValues() { + assertFalse(settableListenableFuture.isDone()); + assertFalse(settableListenableFuture.isCancelled()); + } + + @Test + public void returnsSetValue() throws ExecutionException, InterruptedException { + String string = "hello"; + boolean wasSet = settableListenableFuture.set(string); + assertTrue(wasSet); + assertThat(settableListenableFuture.get(), equalTo(string)); + } + + @Test + public void setValueUpdatesDoneStatus() { + settableListenableFuture.set("hello"); + assertTrue(settableListenableFuture.isDone()); + } + + @Test + public void throwsSetExceptionWrappedInExecutionException() throws ExecutionException, InterruptedException { + Throwable exception = new RuntimeException(); + boolean wasSet = settableListenableFuture.setException(exception); + assertTrue(wasSet); + try { + settableListenableFuture.get(); + fail("Expected ExecutionException"); + } + catch (ExecutionException ex) { + assertThat(ex.getCause(), equalTo(exception)); + } + } + + @Test + public void setValueTriggersCallback() { + String string = "hello"; + final String[] callbackHolder = new String[1]; + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + callbackHolder[0] = result; + } + + @Override + public void onFailure(Throwable t) { + fail("Expected onSuccess() to be called"); + } + }); + settableListenableFuture.set(string); + assertThat(callbackHolder[0], equalTo(string)); + } + + @Test + public void setValueTriggersCallbackOnlyOnce() { + String string = "hello"; + final String[] callbackHolder = new String[1]; + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + callbackHolder[0] = result; + } + + @Override + public void onFailure(Throwable t) { + fail("Expected onSuccess() to be called"); + } + }); + settableListenableFuture.set(string); + assertFalse(settableListenableFuture.set("good bye")); + assertThat(callbackHolder[0], equalTo(string)); + } + + @Test + public void setExceptionTriggersCallback() { + Throwable exception = new RuntimeException(); + final Throwable[] callbackHolder = new Throwable[1]; + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + fail("Expected onFailure() to be called"); + } + + @Override + public void onFailure(Throwable t) { + callbackHolder[0] = t; + } + }); + settableListenableFuture.setException(exception); + assertThat(callbackHolder[0], equalTo(exception)); + } + + @Test + public void setExceptionTriggersCallbackOnlyOnce() { + Throwable exception = new RuntimeException(); + final Throwable[] callbackHolder = new Throwable[1]; + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + fail("Expected onFailure() to be called"); + } + + @Override + public void onFailure(Throwable t) { + callbackHolder[0] = t; + } + }); + settableListenableFuture.setException(exception); + assertFalse(settableListenableFuture.setException(new IllegalArgumentException())); + assertThat(callbackHolder[0], equalTo(exception)); + } + + @Test + public void nullIsAcceptedAsValueToSet() throws ExecutionException, InterruptedException { + settableListenableFuture.set(null); + assertNull(settableListenableFuture.get()); + } + + @Test + public void getWaitsForCompletion() throws ExecutionException, InterruptedException { + final String string = "hello"; + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(20L); + settableListenableFuture.set(string); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + }).start(); + String value = settableListenableFuture.get(); + assertThat(value, equalTo(string)); + } + + @Test + public void getWithTimeoutThrowsTimeoutException() throws ExecutionException, InterruptedException { + try { + settableListenableFuture.get(1L, TimeUnit.MILLISECONDS); + fail("Expected TimeoutException"); + } catch (TimeoutException ex) { + // expected + } + } + + @Test + public void getWithTimeoutWaitsForCompletion() throws ExecutionException, InterruptedException, TimeoutException { + final String string = "hello"; + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(20L); + settableListenableFuture.set(string); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + }).start(); + String value = settableListenableFuture.get(100L, TimeUnit.MILLISECONDS); + assertThat(value, equalTo(string)); + } + + @Test + public void cancelPreventsValueFromBeingSet() { + boolean wasCancelled = settableListenableFuture.cancel(true); + assertTrue(wasCancelled); + boolean wasSet = settableListenableFuture.set("hello"); + assertFalse(wasSet); + } + + @Test + public void cancelSetsFutureToDone() { + settableListenableFuture.cancel(true); + assertTrue(settableListenableFuture.isDone()); + } + + @Test + public void cancelWithMayInterruptIfRunningTrueCallsOverridenMethod() { + InterruptableSettableListenableFuture tested = new InterruptableSettableListenableFuture(); + tested.cancel(true); + assertTrue(tested.calledInterruptTask()); + } + + @Test + public void cancelWithMayInterruptIfRunningFalseDoesNotCallOverridenMethod() { + InterruptableSettableListenableFuture tested = new InterruptableSettableListenableFuture(); + tested.cancel(false); + assertFalse(tested.calledInterruptTask()); + } + + @Test + public void setPreventsCancel() { + boolean wasSet = settableListenableFuture.set("hello"); + assertTrue(wasSet); + boolean wasCancelled = settableListenableFuture.cancel(true); + assertFalse(wasCancelled); + } + + @Test + public void cancelPreventsExceptionFromBeingSet() { + boolean wasCancelled = settableListenableFuture.cancel(true); + assertTrue(wasCancelled); + boolean wasSet = settableListenableFuture.setException(new RuntimeException()); + assertFalse(wasSet); + } + + @Test + public void setExceptionPreventsCancel() { + boolean wasSet = settableListenableFuture.setException(new RuntimeException()); + assertTrue(wasSet); + boolean wasCancelled = settableListenableFuture.cancel(true); + assertFalse(wasCancelled); + } + + @Test + public void cancelStateThrowsExceptionWhenCallingGet() throws ExecutionException, InterruptedException { + settableListenableFuture.cancel(true); + try { + settableListenableFuture.get(); + fail("Expected CancellationException"); + } + catch (CancellationException ex) { + // expected + } + } + + @Test + public void cancelStateThrowsExceptionWhenCallingGetWithTimeout() throws ExecutionException, TimeoutException, InterruptedException { + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(20L); + settableListenableFuture.cancel(true); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + }).start(); + try { + settableListenableFuture.get(100L, TimeUnit.MILLISECONDS); + fail("Expected CancellationException"); + } catch (CancellationException ex) { + // expected + } + } + + @Test + public void cancelDoesNotNotifyCallbacksOnSet() { + ListenableFutureCallback callback = mock(ListenableFutureCallback.class); + settableListenableFuture.addCallback(callback); + settableListenableFuture.cancel(true); + + verify(callback).onFailure(any(CancellationException.class)); + verifyNoMoreInteractions(callback); + + settableListenableFuture.set("hello"); + verifyNoMoreInteractions(callback); + } + + @Test + public void cancelDoesNotNotifyCallbacksOnSetException() { + ListenableFutureCallback callback = mock(ListenableFutureCallback.class); + settableListenableFuture.addCallback(callback); + settableListenableFuture.cancel(true); + + verify(callback).onFailure(any(CancellationException.class)); + verifyNoMoreInteractions(callback); + + settableListenableFuture.setException(new RuntimeException()); + verifyNoMoreInteractions(callback); + } + + private static class InterruptableSettableListenableFuture extends SettableListenableFuture { + + private boolean interrupted = false; + + @Override + protected void interruptTask() { + interrupted = true; + } + + boolean calledInterruptTask() { + return interrupted; + } + } +}