Rossen Stoyanchev
11 years ago
2 changed files with 506 additions and 0 deletions
@ -0,0 +1,174 @@
@@ -0,0 +1,174 @@
|
||||
/* |
||||
* Copyright 2002-2014 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.util.concurrent; |
||||
|
||||
import org.springframework.util.Assert; |
||||
|
||||
import java.util.concurrent.Callable; |
||||
import java.util.concurrent.ExecutionException; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.concurrent.TimeoutException; |
||||
import java.util.concurrent.atomic.AtomicReference; |
||||
|
||||
/** |
||||
* A {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture} |
||||
* whose value can be set via {@link #set(Object)} or |
||||
* {@link #setException(Throwable)}. It may also be cancelled. |
||||
* |
||||
* <p>Inspired by {@code com.google.common.util.concurrent.SettableFuture}. |
||||
* |
||||
* @author Mattias Severson |
||||
* @author Rossen Stoyanchev |
||||
* @since 4.1 |
||||
*/ |
||||
public class SettableListenableFuture<T> implements ListenableFuture<T> { |
||||
|
||||
private final SettableTask<T> settableTask; |
||||
|
||||
private final ListenableFutureTask<T> listenableFuture; |
||||
|
||||
|
||||
public SettableListenableFuture() { |
||||
this.settableTask = new SettableTask<T>(); |
||||
this.listenableFuture = new ListenableFutureTask<T>(this.settableTask); |
||||
} |
||||
|
||||
/** |
||||
* Set the value of this future. This method will return {@code true} if |
||||
* the value was set successfully, or {@code false} if the future has already |
||||
* been set or cancelled. |
||||
* @param value the value that will be set. |
||||
* @return {@code true} if the value was successfully set, else {@code false}. |
||||
*/ |
||||
public boolean set(T value) { |
||||
boolean success = this.settableTask.setValue(value); |
||||
if (success) { |
||||
this.listenableFuture.run(); |
||||
} |
||||
return success; |
||||
} |
||||
|
||||
/** |
||||
* Set the exception of this future. This method will return {@code true} if |
||||
* the exception was set successfully, or {@code false} if the future has already |
||||
* been set or cancelled. |
||||
* @param exception the value that will be set. |
||||
* @return {@code true} if the exception was successfully set, else {@code false}. |
||||
*/ |
||||
public boolean setException(Throwable exception) { |
||||
Assert.notNull(exception, "'exception' must not be null"); |
||||
boolean success = this.settableTask.setValue(exception); |
||||
if (success) { |
||||
this.listenableFuture.run(); |
||||
} |
||||
return success; |
||||
} |
||||
|
||||
@Override |
||||
public void addCallback(ListenableFutureCallback<? super T> callback) { |
||||
this.listenableFuture.addCallback(callback); |
||||
} |
||||
|
||||
@Override |
||||
public boolean cancel(boolean mayInterruptIfRunning) { |
||||
this.settableTask.setCancelled(); |
||||
boolean cancelled = this.listenableFuture.cancel(mayInterruptIfRunning); |
||||
if (cancelled && mayInterruptIfRunning) { |
||||
interruptTask(); |
||||
} |
||||
return cancelled; |
||||
} |
||||
|
||||
@Override |
||||
public boolean isCancelled() { |
||||
return this.listenableFuture.isCancelled(); |
||||
} |
||||
|
||||
@Override |
||||
public boolean isDone() { |
||||
return this.listenableFuture.isDone(); |
||||
} |
||||
|
||||
/** |
||||
* Retrieve the value. |
||||
* <p>Will return the value if it has been set via {@link #set(Object)}, |
||||
* throw an {@link java.util.concurrent.ExecutionException} if it has been |
||||
* set via {@link #setException(Throwable)} or throw a |
||||
* {@link java.util.concurrent.CancellationException} if it has been cancelled. |
||||
* @return The value associated with this future. |
||||
*/ |
||||
@Override |
||||
public T get() throws InterruptedException, ExecutionException { |
||||
return this.listenableFuture.get(); |
||||
} |
||||
|
||||
/** |
||||
* Retrieve the value. |
||||
* <p>Will return the value if it has been set via {@link #set(Object)}, |
||||
* throw an {@link java.util.concurrent.ExecutionException} if it has been |
||||
* set via {@link #setException(Throwable)} or throw a |
||||
* {@link java.util.concurrent.CancellationException} if it has been cancelled. |
||||
* @param timeout the maximum time to wait. |
||||
* @param unit the time unit of the timeout argument. |
||||
* @return The value associated with this future. |
||||
*/ |
||||
@Override |
||||
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { |
||||
return this.listenableFuture.get(timeout, unit); |
||||
} |
||||
|
||||
/** |
||||
* Subclasses can override this method to implement interruption of the future's |
||||
* computation. The method is invoked automatically by a successful call to |
||||
* {@link #cancel(boolean) cancel(true)}. |
||||
* |
||||
* <p>The default implementation does nothing. |
||||
*/ |
||||
protected void interruptTask() { |
||||
} |
||||
|
||||
|
||||
private static class SettableTask<T> implements Callable<T> { |
||||
|
||||
private static final String NO_VALUE = SettableListenableFuture.class.getName() + ".NO_VALUE"; |
||||
|
||||
private final AtomicReference<Object> value = new AtomicReference<Object>(NO_VALUE); |
||||
|
||||
private volatile boolean cancelled = false; |
||||
|
||||
|
||||
public boolean setValue(Object value) { |
||||
if (this.cancelled) { |
||||
return false; |
||||
} |
||||
return this.value.compareAndSet(NO_VALUE, value); |
||||
} |
||||
|
||||
public void setCancelled() { |
||||
this.cancelled = true; |
||||
} |
||||
|
||||
@Override |
||||
public T call() throws Exception { |
||||
if (value.get() instanceof Exception) { |
||||
throw (Exception) value.get(); |
||||
} |
||||
return (T) value.get(); |
||||
} |
||||
} |
||||
|
||||
} |
@ -0,0 +1,332 @@
@@ -0,0 +1,332 @@
|
||||
/* |
||||
* Copyright 2002-2014 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.util.concurrent; |
||||
|
||||
import org.junit.Before; |
||||
import org.junit.Test; |
||||
|
||||
import java.util.concurrent.CancellationException; |
||||
import java.util.concurrent.ExecutionException; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.concurrent.TimeoutException; |
||||
|
||||
import static org.hamcrest.Matchers.equalTo; |
||||
import static org.junit.Assert.*; |
||||
import static org.mockito.Mockito.*; |
||||
|
||||
|
||||
/** |
||||
* @author Mattias Severson |
||||
*/ |
||||
public class SettableListenableFutureTests { |
||||
|
||||
private SettableListenableFuture<String> settableListenableFuture; |
||||
|
||||
@Before |
||||
public void setUp() { |
||||
settableListenableFuture = new SettableListenableFuture<String>(); |
||||
} |
||||
|
||||
@Test |
||||
public void validateInitialValues() { |
||||
assertFalse(settableListenableFuture.isDone()); |
||||
assertFalse(settableListenableFuture.isCancelled()); |
||||
} |
||||
|
||||
@Test |
||||
public void returnsSetValue() throws ExecutionException, InterruptedException { |
||||
String string = "hello"; |
||||
boolean wasSet = settableListenableFuture.set(string); |
||||
assertTrue(wasSet); |
||||
assertThat(settableListenableFuture.get(), equalTo(string)); |
||||
} |
||||
|
||||
@Test |
||||
public void setValueUpdatesDoneStatus() { |
||||
settableListenableFuture.set("hello"); |
||||
assertTrue(settableListenableFuture.isDone()); |
||||
} |
||||
|
||||
@Test |
||||
public void throwsSetExceptionWrappedInExecutionException() throws ExecutionException, InterruptedException { |
||||
Throwable exception = new RuntimeException(); |
||||
boolean wasSet = settableListenableFuture.setException(exception); |
||||
assertTrue(wasSet); |
||||
try { |
||||
settableListenableFuture.get(); |
||||
fail("Expected ExecutionException"); |
||||
} |
||||
catch (ExecutionException ex) { |
||||
assertThat(ex.getCause(), equalTo(exception)); |
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void setValueTriggersCallback() { |
||||
String string = "hello"; |
||||
final String[] callbackHolder = new String[1]; |
||||
settableListenableFuture.addCallback(new ListenableFutureCallback<String>() { |
||||
@Override |
||||
public void onSuccess(String result) { |
||||
callbackHolder[0] = result; |
||||
} |
||||
|
||||
@Override |
||||
public void onFailure(Throwable t) { |
||||
fail("Expected onSuccess() to be called"); |
||||
} |
||||
}); |
||||
settableListenableFuture.set(string); |
||||
assertThat(callbackHolder[0], equalTo(string)); |
||||
} |
||||
|
||||
@Test |
||||
public void setValueTriggersCallbackOnlyOnce() { |
||||
String string = "hello"; |
||||
final String[] callbackHolder = new String[1]; |
||||
settableListenableFuture.addCallback(new ListenableFutureCallback<String>() { |
||||
@Override |
||||
public void onSuccess(String result) { |
||||
callbackHolder[0] = result; |
||||
} |
||||
|
||||
@Override |
||||
public void onFailure(Throwable t) { |
||||
fail("Expected onSuccess() to be called"); |
||||
} |
||||
}); |
||||
settableListenableFuture.set(string); |
||||
assertFalse(settableListenableFuture.set("good bye")); |
||||
assertThat(callbackHolder[0], equalTo(string)); |
||||
} |
||||
|
||||
@Test |
||||
public void setExceptionTriggersCallback() { |
||||
Throwable exception = new RuntimeException(); |
||||
final Throwable[] callbackHolder = new Throwable[1]; |
||||
settableListenableFuture.addCallback(new ListenableFutureCallback<String>() { |
||||
@Override |
||||
public void onSuccess(String result) { |
||||
fail("Expected onFailure() to be called"); |
||||
} |
||||
|
||||
@Override |
||||
public void onFailure(Throwable t) { |
||||
callbackHolder[0] = t; |
||||
} |
||||
}); |
||||
settableListenableFuture.setException(exception); |
||||
assertThat(callbackHolder[0], equalTo(exception)); |
||||
} |
||||
|
||||
@Test |
||||
public void setExceptionTriggersCallbackOnlyOnce() { |
||||
Throwable exception = new RuntimeException(); |
||||
final Throwable[] callbackHolder = new Throwable[1]; |
||||
settableListenableFuture.addCallback(new ListenableFutureCallback<String>() { |
||||
@Override |
||||
public void onSuccess(String result) { |
||||
fail("Expected onFailure() to be called"); |
||||
} |
||||
|
||||
@Override |
||||
public void onFailure(Throwable t) { |
||||
callbackHolder[0] = t; |
||||
} |
||||
}); |
||||
settableListenableFuture.setException(exception); |
||||
assertFalse(settableListenableFuture.setException(new IllegalArgumentException())); |
||||
assertThat(callbackHolder[0], equalTo(exception)); |
||||
} |
||||
|
||||
@Test |
||||
public void nullIsAcceptedAsValueToSet() throws ExecutionException, InterruptedException { |
||||
settableListenableFuture.set(null); |
||||
assertNull(settableListenableFuture.get()); |
||||
} |
||||
|
||||
@Test |
||||
public void getWaitsForCompletion() throws ExecutionException, InterruptedException { |
||||
final String string = "hello"; |
||||
new Thread(new Runnable() { |
||||
@Override |
||||
public void run() { |
||||
try { |
||||
Thread.sleep(20L); |
||||
settableListenableFuture.set(string); |
||||
} catch (InterruptedException ex) { |
||||
throw new RuntimeException(ex); |
||||
} |
||||
} |
||||
}).start(); |
||||
String value = settableListenableFuture.get(); |
||||
assertThat(value, equalTo(string)); |
||||
} |
||||
|
||||
@Test |
||||
public void getWithTimeoutThrowsTimeoutException() throws ExecutionException, InterruptedException { |
||||
try { |
||||
settableListenableFuture.get(1L, TimeUnit.MILLISECONDS); |
||||
fail("Expected TimeoutException"); |
||||
} catch (TimeoutException ex) { |
||||
// expected
|
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void getWithTimeoutWaitsForCompletion() throws ExecutionException, InterruptedException, TimeoutException { |
||||
final String string = "hello"; |
||||
new Thread(new Runnable() { |
||||
@Override |
||||
public void run() { |
||||
try { |
||||
Thread.sleep(20L); |
||||
settableListenableFuture.set(string); |
||||
} catch (InterruptedException ex) { |
||||
throw new RuntimeException(ex); |
||||
} |
||||
} |
||||
}).start(); |
||||
String value = settableListenableFuture.get(100L, TimeUnit.MILLISECONDS); |
||||
assertThat(value, equalTo(string)); |
||||
} |
||||
|
||||
@Test |
||||
public void cancelPreventsValueFromBeingSet() { |
||||
boolean wasCancelled = settableListenableFuture.cancel(true); |
||||
assertTrue(wasCancelled); |
||||
boolean wasSet = settableListenableFuture.set("hello"); |
||||
assertFalse(wasSet); |
||||
} |
||||
|
||||
@Test |
||||
public void cancelSetsFutureToDone() { |
||||
settableListenableFuture.cancel(true); |
||||
assertTrue(settableListenableFuture.isDone()); |
||||
} |
||||
|
||||
@Test |
||||
public void cancelWithMayInterruptIfRunningTrueCallsOverridenMethod() { |
||||
InterruptableSettableListenableFuture tested = new InterruptableSettableListenableFuture(); |
||||
tested.cancel(true); |
||||
assertTrue(tested.calledInterruptTask()); |
||||
} |
||||
|
||||
@Test |
||||
public void cancelWithMayInterruptIfRunningFalseDoesNotCallOverridenMethod() { |
||||
InterruptableSettableListenableFuture tested = new InterruptableSettableListenableFuture(); |
||||
tested.cancel(false); |
||||
assertFalse(tested.calledInterruptTask()); |
||||
} |
||||
|
||||
@Test |
||||
public void setPreventsCancel() { |
||||
boolean wasSet = settableListenableFuture.set("hello"); |
||||
assertTrue(wasSet); |
||||
boolean wasCancelled = settableListenableFuture.cancel(true); |
||||
assertFalse(wasCancelled); |
||||
} |
||||
|
||||
@Test |
||||
public void cancelPreventsExceptionFromBeingSet() { |
||||
boolean wasCancelled = settableListenableFuture.cancel(true); |
||||
assertTrue(wasCancelled); |
||||
boolean wasSet = settableListenableFuture.setException(new RuntimeException()); |
||||
assertFalse(wasSet); |
||||
} |
||||
|
||||
@Test |
||||
public void setExceptionPreventsCancel() { |
||||
boolean wasSet = settableListenableFuture.setException(new RuntimeException()); |
||||
assertTrue(wasSet); |
||||
boolean wasCancelled = settableListenableFuture.cancel(true); |
||||
assertFalse(wasCancelled); |
||||
} |
||||
|
||||
@Test |
||||
public void cancelStateThrowsExceptionWhenCallingGet() throws ExecutionException, InterruptedException { |
||||
settableListenableFuture.cancel(true); |
||||
try { |
||||
settableListenableFuture.get(); |
||||
fail("Expected CancellationException"); |
||||
} |
||||
catch (CancellationException ex) { |
||||
// expected
|
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void cancelStateThrowsExceptionWhenCallingGetWithTimeout() throws ExecutionException, TimeoutException, InterruptedException { |
||||
new Thread(new Runnable() { |
||||
@Override |
||||
public void run() { |
||||
try { |
||||
Thread.sleep(20L); |
||||
settableListenableFuture.cancel(true); |
||||
} catch (InterruptedException ex) { |
||||
throw new RuntimeException(ex); |
||||
} |
||||
} |
||||
}).start(); |
||||
try { |
||||
settableListenableFuture.get(100L, TimeUnit.MILLISECONDS); |
||||
fail("Expected CancellationException"); |
||||
} catch (CancellationException ex) { |
||||
// expected
|
||||
} |
||||
} |
||||
|
||||
@Test |
||||
public void cancelDoesNotNotifyCallbacksOnSet() { |
||||
ListenableFutureCallback callback = mock(ListenableFutureCallback.class); |
||||
settableListenableFuture.addCallback(callback); |
||||
settableListenableFuture.cancel(true); |
||||
|
||||
verify(callback).onFailure(any(CancellationException.class)); |
||||
verifyNoMoreInteractions(callback); |
||||
|
||||
settableListenableFuture.set("hello"); |
||||
verifyNoMoreInteractions(callback); |
||||
} |
||||
|
||||
@Test |
||||
public void cancelDoesNotNotifyCallbacksOnSetException() { |
||||
ListenableFutureCallback callback = mock(ListenableFutureCallback.class); |
||||
settableListenableFuture.addCallback(callback); |
||||
settableListenableFuture.cancel(true); |
||||
|
||||
verify(callback).onFailure(any(CancellationException.class)); |
||||
verifyNoMoreInteractions(callback); |
||||
|
||||
settableListenableFuture.setException(new RuntimeException()); |
||||
verifyNoMoreInteractions(callback); |
||||
} |
||||
|
||||
private static class InterruptableSettableListenableFuture extends SettableListenableFuture<String> { |
||||
|
||||
private boolean interrupted = false; |
||||
|
||||
@Override |
||||
protected void interruptTask() { |
||||
interrupted = true; |
||||
} |
||||
|
||||
boolean calledInterruptTask() { |
||||
return interrupted; |
||||
} |
||||
} |
||||
} |
Loading…
Reference in new issue