diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java
new file mode 100644
index 0000000000..4e6f7498a1
--- /dev/null
+++ b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2002-2014 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.util.concurrent;
+
+import org.springframework.util.Assert;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A {@link ListenableFuture} whose value can be set by the {@link #set(Object)} or
+ * {@link #setException(Throwable)}. It may also be cancelled.
+ *
+ *
Inspired by {@code com.google.common.util.concurrent.SettableFuture}.
+ *
+ * @author Mattias Severson
+ * @since 4.1
+ */
+public class SettableListenableFuture implements ListenableFuture {
+
+ private final SettableFuture settableFuture = new SettableFuture();
+ private final ListenableFutureCallbackRegistry registry = new ListenableFutureCallbackRegistry();
+
+
+ /**
+ * Set the value of this future. This method will return {@code true} if
+ * the value was set successfully, or {@code false} if the future has already
+ * been set or cancelled.
+ *
+ * @param value the value that will be set.
+ * @return {@code true} if the value was successfully set, else {@code false}.
+ */
+ public boolean set(T value) {
+ boolean setValue = this.settableFuture.setValue(value);
+ if (setValue) {
+ this.registry.success(value);
+ }
+ return setValue;
+ }
+
+ /**
+ * Set the exception of this future. This method will return {@code true} if
+ * the exception was set successfully, or {@code false} if the future has already
+ * been set or cancelled.
+ * @param exception the value that will be set.
+ * @return {@code true} if the exception was successfully set, else {@code false}.
+ */
+ public boolean setException(Throwable exception) {
+ Assert.notNull(exception, "exception must not be null");
+ boolean setException = this.settableFuture.setThrowable(exception);
+ if (setException) {
+ this.registry.failure(exception);
+ }
+ return setException;
+ }
+
+ @Override
+ public void addCallback(ListenableFutureCallback super T> callback) {
+ this.registry.addCallback(callback);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ boolean cancelled = this.settableFuture.cancel(mayInterruptIfRunning);
+ if (cancelled && mayInterruptIfRunning) {
+ interruptTask();
+ }
+ return cancelled;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return this.settableFuture.isCancelled();
+ }
+
+ @Override
+ public boolean isDone() {
+ return this.settableFuture.isDone();
+ }
+
+ /**
+ * Retrieve the value.
+ * Will return the value if it has been set by calling {@link #set(Object)}, throw
+ * an {@link ExecutionException} if the {@link #setException(Throwable)} has been
+ * called, throw a {@link CancellationException} if the future has been cancelled, or
+ * throw an {@link IllegalStateException} if neither a value, nor an exception has
+ * been set.
+ * @return The value associated with this future.
+ */
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ return this.settableFuture.get();
+ }
+
+ /**
+ * Retrieve the value.
+ *
Will return the value if it has been by calling {@link #set(Object)}, throw an
+ * {@link ExecutionException} if the {@link #setException(Throwable)}
+ * has been called, throw a {@link java.util.concurrent.CancellationException} if the
+ * future has been cancelled.
+ * @param timeout the maximum time to wait.
+ * @param unit the time unit of the timeout argument.
+ * @return The value associated with this future.
+ */
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return this.settableFuture.get(timeout, unit);
+ }
+
+ /**
+ * Subclasses can override this method to implement interruption of the future's
+ * computation. The method is invoked automatically by a successful call to
+ * {@link #cancel(boolean) cancel(true)}.
+ *
+ *
The default implementation does nothing.
+ */
+ protected void interruptTask() {
+ }
+
+
+ /**
+ * Helper class that keeps track of the state of this future.
+ * @param The type of value to be set.
+ */
+ private static class SettableFuture implements Future {
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private T value;
+ private Throwable throwable;
+ private State state = State.INITIALIZED;
+
+
+ @Override
+ public T get() throws ExecutionException, InterruptedException {
+ this.latch.await();
+ this.lock.readLock().lock();
+ try {
+ return getValue();
+ }
+ finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ if (this.latch.await(timeout, unit)) {
+ this.lock.readLock().lock();
+ try {
+ return getValue();
+ }
+ finally {
+ this.lock.readLock().unlock();
+ }
+ }
+ else {
+ throw new TimeoutException();
+ }
+ }
+
+ private T getValue() throws ExecutionException {
+ switch (this.state) {
+ case COMPLETED:
+ if (this.throwable != null) {
+ throw new ExecutionException(this.throwable);
+ }
+ else {
+ return this.value;
+ }
+ case CANCELLED:
+ throw new CancellationException("Future has been cancelled.");
+ default:
+ throw new IllegalStateException("Invalid state: " + this.state);
+ }
+ }
+
+ @Override
+ public boolean isDone() {
+ this.lock.readLock().lock();
+ try {
+ switch (this.state) {
+ case COMPLETED:
+ case CANCELLED:
+ return true;
+ default:
+ return false;
+ }
+ }
+ finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ this.lock.writeLock().lock();
+ try {
+ if (this.state.equals(State.INITIALIZED)) {
+ this.state = State.CANCELLED;
+ this.latch.countDown();
+ return true;
+ }
+ }
+ finally {
+ this.lock.writeLock().unlock();
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ this.lock.readLock().lock();
+ try {
+ return this.state.equals(State.CANCELLED);
+ }
+ finally {
+ this.lock.readLock().unlock();
+ }
+ }
+
+ boolean setValue(T value) {
+ this.lock.writeLock().lock();
+ try {
+ if (this.state.equals(State.INITIALIZED)) {
+ this.value = value;
+ this.state = State.COMPLETED;
+ this.latch.countDown();
+ return true;
+ }
+ }
+ finally {
+ this.lock.writeLock().unlock();
+ }
+ return false;
+ }
+
+ Throwable getThrowable() {
+ return this.throwable;
+ }
+
+ boolean setThrowable(Throwable throwable) {
+ this.lock.writeLock().lock();
+ try {
+ if (this.state.equals(State.INITIALIZED)) {
+ this.throwable = throwable;
+ this.state = State.COMPLETED;
+ this.latch.countDown();
+ return true;
+ }
+ }
+ finally {
+ this.lock.writeLock().unlock();
+ }
+ return false;
+ }
+
+ private enum State {INITIALIZED, COMPLETED, CANCELLED}
+ }
+}
diff --git a/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java b/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java
new file mode 100644
index 0000000000..731bd14226
--- /dev/null
+++ b/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java
@@ -0,0 +1,338 @@
+/*
+ * Copyright 2002-2014 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.util.concurrent;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.*;
+
+/**
+ * @author Mattias Severson
+ */
+public class SettableListenableFutureTests {
+
+ private SettableListenableFuture settableListenableFuture;
+
+ @Before
+ public void setUp() {
+ settableListenableFuture = new SettableListenableFuture();
+ }
+
+ @Test
+ public void validateInitialValues() {
+ assertFalse(settableListenableFuture.isDone());
+ assertFalse(settableListenableFuture.isCancelled());
+ }
+
+ @Test
+ public void returnsSetValue() throws ExecutionException, InterruptedException {
+ String string = "hello";
+ boolean wasSet = settableListenableFuture.set(string);
+ assertTrue(wasSet);
+ assertThat(settableListenableFuture.get(), equalTo(string));
+ }
+
+ @Test
+ public void setValueUpdatesDoneStatus() {
+ settableListenableFuture.set("hello");
+ assertTrue(settableListenableFuture.isDone());
+ }
+
+ @Test
+ public void throwsSetExceptionWrappedInExecutionException() throws ExecutionException, InterruptedException {
+ Throwable exception = new RuntimeException();
+ boolean wasSet = settableListenableFuture.setException(exception);
+ assertTrue(wasSet);
+ try {
+ settableListenableFuture.get();
+ fail("Expected ExecutionException");
+ }
+ catch (ExecutionException ex) {
+ assertThat(ex.getCause(), equalTo(exception));
+ }
+ }
+
+ @Test
+ public void setValueTriggersCallback() {
+ String string = "hello";
+ final String[] callbackHolder = new String[1];
+ settableListenableFuture.addCallback(new ListenableFutureCallback() {
+ @Override
+ public void onSuccess(String result) {
+ callbackHolder[0] = result;
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ fail("Expected onSuccess() to be called");
+ }
+ });
+ settableListenableFuture.set(string);
+ assertThat(callbackHolder[0], equalTo(string));
+ }
+
+ @Test
+ public void setValueTriggersCallbackOnlyOnce() {
+ String string = "hello";
+ final String[] callbackHolder = new String[1];
+ settableListenableFuture.addCallback(new ListenableFutureCallback() {
+ @Override
+ public void onSuccess(String result) {
+ callbackHolder[0] = result;
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ fail("Expected onSuccess() to be called");
+ }
+ });
+ settableListenableFuture.set(string);
+ assertFalse(settableListenableFuture.set("good bye"));
+ assertThat(callbackHolder[0], equalTo(string));
+ }
+
+ @Test
+ public void setExceptionTriggersCallback() {
+ Throwable exception = new RuntimeException();
+ final Throwable[] callbackHolder = new Throwable[1];
+ settableListenableFuture.addCallback(new ListenableFutureCallback() {
+ @Override
+ public void onSuccess(String result) {
+ fail("Expected onFailure() to be called");
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callbackHolder[0] = t;
+ }
+ });
+ settableListenableFuture.setException(exception);
+ assertThat(callbackHolder[0], equalTo(exception));
+ }
+
+ @Test
+ public void setExceptionTriggersCallbackOnlyOnce() {
+ Throwable exception = new RuntimeException();
+ final Throwable[] callbackHolder = new Throwable[1];
+ settableListenableFuture.addCallback(new ListenableFutureCallback() {
+ @Override
+ public void onSuccess(String result) {
+ fail("Expected onFailure() to be called");
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callbackHolder[0] = t;
+ }
+ });
+ settableListenableFuture.setException(exception);
+ assertFalse(settableListenableFuture.setException(new IllegalArgumentException()));
+ assertThat(callbackHolder[0], equalTo(exception));
+ }
+
+ @Test
+ public void nullIsAcceptedAsValueToSet() throws ExecutionException, InterruptedException {
+ settableListenableFuture.set(null);
+ assertNull(settableListenableFuture.get());
+ }
+
+ @Test
+ public void getWaitsForCompletion() throws ExecutionException, InterruptedException {
+ final String string = "hello";
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(20L);
+ settableListenableFuture.set(string);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }).start();
+ String value = settableListenableFuture.get();
+ assertThat(value, equalTo(string));
+ }
+
+ @Test
+ public void getWithTimeoutThrowsTimeoutException() throws ExecutionException, InterruptedException {
+ try {
+ settableListenableFuture.get(1L, TimeUnit.MILLISECONDS);
+ fail("Expected TimeoutException");
+ } catch (TimeoutException ex) {
+ // expected
+ }
+ }
+
+ @Test
+ public void getWithTimeoutWaitsForCompletion() throws ExecutionException, InterruptedException, TimeoutException {
+ final String string = "hello";
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(20L);
+ settableListenableFuture.set(string);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }).start();
+ String value = settableListenableFuture.get(100L, TimeUnit.MILLISECONDS);
+ assertThat(value, equalTo(string));
+ }
+
+ @Test
+ public void cancelPreventsValueFromBeingSet() {
+ boolean wasCancelled = settableListenableFuture.cancel(true);
+ assertTrue(wasCancelled);
+ boolean wasSet = settableListenableFuture.set("hello");
+ assertFalse(wasSet);
+ }
+
+ @Test
+ public void cancelSetsFutureToDone() {
+ settableListenableFuture.cancel(true);
+ assertTrue(settableListenableFuture.isDone());
+ }
+
+ @Test
+ public void cancelWithMayInterruptIfRunningTrueCallsOverridenMethod() {
+ InterruptableSettableListenableFuture tested = new InterruptableSettableListenableFuture();
+ tested.cancel(true);
+ assertTrue(tested.calledInterruptTask());
+ }
+
+ @Test
+ public void cancelWithMayInterruptIfRunningFalseDoesNotCallOverridenMethod() {
+ InterruptableSettableListenableFuture tested = new InterruptableSettableListenableFuture();
+ tested.cancel(false);
+ assertFalse(tested.calledInterruptTask());
+ }
+
+ @Test
+ public void setPreventsCancel() {
+ boolean wasSet = settableListenableFuture.set("hello");
+ assertTrue(wasSet);
+ boolean wasCancelled = settableListenableFuture.cancel(true);
+ assertFalse(wasCancelled);
+ }
+
+ @Test
+ public void cancelPreventsExceptionFromBeingSet() {
+ boolean wasCancelled = settableListenableFuture.cancel(true);
+ assertTrue(wasCancelled);
+ boolean wasSet = settableListenableFuture.setException(new RuntimeException());
+ assertFalse(wasSet);
+ }
+
+ @Test
+ public void setExceptionPreventsCancel() {
+ boolean wasSet = settableListenableFuture.setException(new RuntimeException());
+ assertTrue(wasSet);
+ boolean wasCancelled = settableListenableFuture.cancel(true);
+ assertFalse(wasCancelled);
+ }
+
+ @Test
+ public void cancelStateThrowsExceptionWhenCallingGet() throws ExecutionException, InterruptedException {
+ settableListenableFuture.cancel(true);
+ try {
+ settableListenableFuture.get();
+ fail("Expected CancellationException");
+ }
+ catch (CancellationException ex) {
+ // expected
+ }
+ }
+
+ @Test
+ public void cancelStateThrowsExceptionWhenCallingGetWithTimeout() throws ExecutionException, TimeoutException, InterruptedException {
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(20L);
+ settableListenableFuture.cancel(true);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+ }).start();
+ try {
+ settableListenableFuture.get(100L, TimeUnit.MILLISECONDS);
+ fail("Expected CancellationException");
+ } catch (CancellationException ex) {
+ // expected
+ }
+ }
+
+ @Test
+ public void cancelDoesNotNotifyCallbacksOnSet() {
+ settableListenableFuture.addCallback(new ListenableFutureCallback() {
+ @Override
+ public void onSuccess(String result) {
+ fail("onSuccess should not have been called");
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ fail("onFailure should not have been called");
+ }
+ });
+ settableListenableFuture.cancel(true);
+ settableListenableFuture.set("hello");
+ }
+
+ @Test
+ public void cancelDoesNotNotifyCallbacksOnSetException() {
+ settableListenableFuture.addCallback(new ListenableFutureCallback() {
+ @Override
+ public void onSuccess(String result) {
+ fail("onSuccess should not have been called");
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ fail("onFailure should not have been called");
+ }
+ });
+ settableListenableFuture.cancel(true);
+ settableListenableFuture.setException(new RuntimeException());
+ }
+
+ private static class InterruptableSettableListenableFuture extends SettableListenableFuture {
+
+ private boolean interrupted = false;
+
+ @Override
+ protected void interruptTask() {
+ interrupted = true;
+ }
+
+ boolean calledInterruptTask() {
+ return interrupted;
+ }
+ }
+}