Browse Source

Revise SettableListenableFuture implementation

This change modifies the SettableListenableFuture implementation to use
internally a ListenableFutureTask created with a "settable" Callable.

Issue: SPR-11614
pull/1385/head
Rossen Stoyanchev 11 years ago
parent
commit
0640a32863
  1. 244
      spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java
  2. 38
      spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java

244
spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java

@ -18,44 +18,48 @@ package org.springframework.util.concurrent; @@ -18,44 +18,48 @@ package org.springframework.util.concurrent;
import org.springframework.util.Assert;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicReference;
/**
* A {@link ListenableFuture} whose value can be set by the {@link #set(Object)} or
* A {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture}
* whose value can be set via {@link #set(Object)} or
* {@link #setException(Throwable)}. It may also be cancelled.
*
* <p>Inspired by {@code com.google.common.util.concurrent.SettableFuture}.
*
* @author Mattias Severson
* @author Rossen Stoyanchev
* @since 4.1
*/
public class SettableListenableFuture<T> implements ListenableFuture<T> {
private final SettableFuture<T> settableFuture = new SettableFuture<T>();
private final ListenableFutureCallbackRegistry<T> registry = new ListenableFutureCallbackRegistry<T>();
private final SettableTask<T> settableTask;
private final ListenableFutureTask<T> listenableFuture;
public SettableListenableFuture() {
this.settableTask = new SettableTask<T>();
this.listenableFuture = new ListenableFutureTask<T>(this.settableTask);
}
/**
* Set the value of this future. This method will return {@code true} if
* the value was set successfully, or {@code false} if the future has already
* been set or cancelled.
*
* @param value the value that will be set.
* @return {@code true} if the value was successfully set, else {@code false}.
*/
public boolean set(T value) {
boolean setValue = this.settableFuture.setValue(value);
if (setValue) {
this.registry.success(value);
boolean success = this.settableTask.setValue(value);
if (success) {
this.listenableFuture.run();
}
return setValue;
return success;
}
/**
@ -66,66 +70,66 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { @@ -66,66 +70,66 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
* @return {@code true} if the exception was successfully set, else {@code false}.
*/
public boolean setException(Throwable exception) {
Assert.notNull(exception, "exception must not be null");
boolean setException = this.settableFuture.setThrowable(exception);
if (setException) {
this.registry.failure(exception);
Assert.notNull(exception, "'exception' must not be null");
boolean success = this.settableTask.setValue(exception);
if (success) {
this.listenableFuture.run();
}
return setException;
return success;
}
@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
this.registry.addCallback(callback);
}
@Override
public void addCallback(ListenableFutureCallback<? super T> callback) {
this.listenableFuture.addCallback(callback);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = this.settableFuture.cancel(mayInterruptIfRunning);
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
this.settableTask.setCancelled();
boolean cancelled = this.listenableFuture.cancel(mayInterruptIfRunning);
if (cancelled && mayInterruptIfRunning) {
interruptTask();
}
return cancelled;
}
}
@Override
public boolean isCancelled() {
return this.settableFuture.isCancelled();
}
@Override
public boolean isCancelled() {
return this.listenableFuture.isCancelled();
}
@Override
public boolean isDone() {
return this.settableFuture.isDone();
}
@Override
public boolean isDone() {
return this.listenableFuture.isDone();
}
/**
* Retrieve the value.
* <p>Will return the value if it has been set by calling {@link #set(Object)}, throw
* an {@link ExecutionException} if the {@link #setException(Throwable)} has been
* called, throw a {@link CancellationException} if the future has been cancelled, or
* throw an {@link IllegalStateException} if neither a value, nor an exception has
* been set.
* <p>Will return the value if it has been set via {@link #set(Object)},
* throw an {@link java.util.concurrent.ExecutionException} if it has been
* set via {@link #setException(Throwable)} or throw a
* {@link java.util.concurrent.CancellationException} if it has been cancelled.
* @return The value associated with this future.
*/
@Override
public T get() throws InterruptedException, ExecutionException {
return this.settableFuture.get();
}
@Override
public T get() throws InterruptedException, ExecutionException {
return this.listenableFuture.get();
}
/**
* Retrieve the value.
* <p>Will return the value if it has been by calling {@link #set(Object)}, throw an
* {@link ExecutionException} if the {@link #setException(Throwable)}
* has been called, throw a {@link java.util.concurrent.CancellationException} if the
* future has been cancelled.
* <p>Will return the value if it has been set via {@link #set(Object)},
* throw an {@link java.util.concurrent.ExecutionException} if it has been
* set via {@link #setException(Throwable)} or throw a
* {@link java.util.concurrent.CancellationException} if it has been cancelled.
* @param timeout the maximum time to wait.
* @param unit the time unit of the timeout argument.
* @return The value associated with this future.
*/
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return this.settableFuture.get(timeout, unit);
}
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return this.listenableFuture.get(timeout, unit);
}
/**
* Subclasses can override this method to implement interruption of the future's
@ -138,143 +142,33 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> { @@ -138,143 +142,33 @@ public class SettableListenableFuture<T> implements ListenableFuture<T> {
}
/**
* Helper class that keeps track of the state of this future.
* @param <T> The type of value to be set.
*/
private static class SettableFuture<T> implements Future<T> {
private static class SettableTask<T> implements Callable<T> {
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
private final CountDownLatch latch = new CountDownLatch(1);
private T value;
private Throwable throwable;
private State state = State.INITIALIZED;
private static final String NO_VALUE = SettableListenableFuture.class.getName() + ".NO_VALUE";
private final AtomicReference<Object> value = new AtomicReference<Object>(NO_VALUE);
@Override
public T get() throws ExecutionException, InterruptedException {
this.latch.await();
this.lock.readLock().lock();
try {
return getValue();
}
finally {
this.lock.readLock().unlock();
}
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (this.latch.await(timeout, unit)) {
this.lock.readLock().lock();
try {
return getValue();
}
finally {
this.lock.readLock().unlock();
}
}
else {
throw new TimeoutException();
}
}
private volatile boolean cancelled = false;
private T getValue() throws ExecutionException {
switch (this.state) {
case COMPLETED:
if (this.throwable != null) {
throw new ExecutionException(this.throwable);
}
else {
return this.value;
}
case CANCELLED:
throw new CancellationException("Future has been cancelled.");
default:
throw new IllegalStateException("Invalid state: " + this.state);
}
}
@Override
public boolean isDone() {
this.lock.readLock().lock();
try {
switch (this.state) {
case COMPLETED:
case CANCELLED:
return true;
default:
return false;
}
}
finally {
this.lock.readLock().unlock();
public boolean setValue(Object value) {
if (this.cancelled) {
return false;
}
return this.value.compareAndSet(NO_VALUE, value);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
this.lock.writeLock().lock();
try {
if (this.state.equals(State.INITIALIZED)) {
this.state = State.CANCELLED;
this.latch.countDown();
return true;
}
}
finally {
this.lock.writeLock().unlock();
}
return false;
public void setCancelled() {
this.cancelled = true;
}
@Override
public boolean isCancelled() {
this.lock.readLock().lock();
try {
return this.state.equals(State.CANCELLED);
}
finally {
this.lock.readLock().unlock();
public T call() throws Exception {
if (value.get() instanceof Exception) {
throw (Exception) value.get();
}
return (T) value.get();
}
boolean setValue(T value) {
this.lock.writeLock().lock();
try {
if (this.state.equals(State.INITIALIZED)) {
this.value = value;
this.state = State.COMPLETED;
this.latch.countDown();
return true;
}
}
finally {
this.lock.writeLock().unlock();
}
return false;
}
Throwable getThrowable() {
return this.throwable;
}
boolean setThrowable(Throwable throwable) {
this.lock.writeLock().lock();
try {
if (this.state.equals(State.INITIALIZED)) {
this.throwable = throwable;
this.state = State.COMPLETED;
this.latch.countDown();
return true;
}
}
finally {
this.lock.writeLock().unlock();
}
return false;
}
private enum State {INITIALIZED, COMPLETED, CANCELLED}
}
}

38
spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java

@ -26,6 +26,8 @@ import java.util.concurrent.TimeoutException; @@ -26,6 +26,8 @@ import java.util.concurrent.TimeoutException;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* @author Mattias Severson
@ -290,36 +292,28 @@ public class SettableListenableFutureTests { @@ -290,36 +292,28 @@ public class SettableListenableFutureTests {
@Test
public void cancelDoesNotNotifyCallbacksOnSet() {
settableListenableFuture.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
fail("onSuccess should not have been called");
}
@Override
public void onFailure(Throwable t) {
fail("onFailure should not have been called");
}
});
ListenableFutureCallback callback = mock(ListenableFutureCallback.class);
settableListenableFuture.addCallback(callback);
settableListenableFuture.cancel(true);
verify(callback).onFailure(any(CancellationException.class));
verifyNoMoreInteractions(callback);
settableListenableFuture.set("hello");
verifyNoMoreInteractions(callback);
}
@Test
public void cancelDoesNotNotifyCallbacksOnSetException() {
settableListenableFuture.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
fail("onSuccess should not have been called");
}
@Override
public void onFailure(Throwable t) {
fail("onFailure should not have been called");
}
});
ListenableFutureCallback callback = mock(ListenableFutureCallback.class);
settableListenableFuture.addCallback(callback);
settableListenableFuture.cancel(true);
verify(callback).onFailure(any(CancellationException.class));
verifyNoMoreInteractions(callback);
settableListenableFuture.setException(new RuntimeException());
verifyNoMoreInteractions(callback);
}
private static class InterruptableSettableListenableFuture extends SettableListenableFuture<String> {

Loading…
Cancel
Save