From 38b525aa9e1fec1fbd611070c5abea24e378bf7d Mon Sep 17 00:00:00 2001 From: Mattias Severson Date: Mon, 31 Mar 2014 19:33:16 +0200 Subject: [PATCH 1/2] Created SettableListenableFuture with tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A SettableListenableFuture implementation of Spring's ListenableFuture The class is inspired by Google Guava’s com.google.common.util.concurrent.SettableFuture, but this implementation uses ReentrantReadWriteLock and CountDownLatch internally to handle thread synchronization. Issue: SPR-11614 --- .../concurrent/SettableListenableFuture.java | 280 +++++++++++++++ .../SettableListenableFutureTests.java | 338 ++++++++++++++++++ 2 files changed, 618 insertions(+) create mode 100644 spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java create mode 100644 spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java new file mode 100644 index 0000000000..4e6f7498a1 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java @@ -0,0 +1,280 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +import org.springframework.util.Assert; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A {@link ListenableFuture} whose value can be set by the {@link #set(Object)} or + * {@link #setException(Throwable)}. It may also be cancelled. + * + *

Inspired by {@code com.google.common.util.concurrent.SettableFuture}. + * + * @author Mattias Severson + * @since 4.1 + */ +public class SettableListenableFuture implements ListenableFuture { + + private final SettableFuture settableFuture = new SettableFuture(); + private final ListenableFutureCallbackRegistry registry = new ListenableFutureCallbackRegistry(); + + + /** + * Set the value of this future. This method will return {@code true} if + * the value was set successfully, or {@code false} if the future has already + * been set or cancelled. + * + * @param value the value that will be set. + * @return {@code true} if the value was successfully set, else {@code false}. + */ + public boolean set(T value) { + boolean setValue = this.settableFuture.setValue(value); + if (setValue) { + this.registry.success(value); + } + return setValue; + } + + /** + * Set the exception of this future. This method will return {@code true} if + * the exception was set successfully, or {@code false} if the future has already + * been set or cancelled. + * @param exception the value that will be set. + * @return {@code true} if the exception was successfully set, else {@code false}. + */ + public boolean setException(Throwable exception) { + Assert.notNull(exception, "exception must not be null"); + boolean setException = this.settableFuture.setThrowable(exception); + if (setException) { + this.registry.failure(exception); + } + return setException; + } + + @Override + public void addCallback(ListenableFutureCallback callback) { + this.registry.addCallback(callback); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = this.settableFuture.cancel(mayInterruptIfRunning); + if (cancelled && mayInterruptIfRunning) { + interruptTask(); + } + return cancelled; + } + + @Override + public boolean isCancelled() { + return this.settableFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return this.settableFuture.isDone(); + } + + /** + * Retrieve the value. + *

Will return the value if it has been set by calling {@link #set(Object)}, throw + * an {@link ExecutionException} if the {@link #setException(Throwable)} has been + * called, throw a {@link CancellationException} if the future has been cancelled, or + * throw an {@link IllegalStateException} if neither a value, nor an exception has + * been set. + * @return The value associated with this future. + */ + @Override + public T get() throws InterruptedException, ExecutionException { + return this.settableFuture.get(); + } + + /** + * Retrieve the value. + *

Will return the value if it has been by calling {@link #set(Object)}, throw an + * {@link ExecutionException} if the {@link #setException(Throwable)} + * has been called, throw a {@link java.util.concurrent.CancellationException} if the + * future has been cancelled. + * @param timeout the maximum time to wait. + * @param unit the time unit of the timeout argument. + * @return The value associated with this future. + */ + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return this.settableFuture.get(timeout, unit); + } + + /** + * Subclasses can override this method to implement interruption of the future's + * computation. The method is invoked automatically by a successful call to + * {@link #cancel(boolean) cancel(true)}. + * + *

The default implementation does nothing. + */ + protected void interruptTask() { + } + + + /** + * Helper class that keeps track of the state of this future. + * @param The type of value to be set. + */ + private static class SettableFuture implements Future { + + private final ReadWriteLock lock = new ReentrantReadWriteLock(true); + private final CountDownLatch latch = new CountDownLatch(1); + private T value; + private Throwable throwable; + private State state = State.INITIALIZED; + + + @Override + public T get() throws ExecutionException, InterruptedException { + this.latch.await(); + this.lock.readLock().lock(); + try { + return getValue(); + } + finally { + this.lock.readLock().unlock(); + } + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (this.latch.await(timeout, unit)) { + this.lock.readLock().lock(); + try { + return getValue(); + } + finally { + this.lock.readLock().unlock(); + } + } + else { + throw new TimeoutException(); + } + } + + private T getValue() throws ExecutionException { + switch (this.state) { + case COMPLETED: + if (this.throwable != null) { + throw new ExecutionException(this.throwable); + } + else { + return this.value; + } + case CANCELLED: + throw new CancellationException("Future has been cancelled."); + default: + throw new IllegalStateException("Invalid state: " + this.state); + } + } + + @Override + public boolean isDone() { + this.lock.readLock().lock(); + try { + switch (this.state) { + case COMPLETED: + case CANCELLED: + return true; + default: + return false; + } + } + finally { + this.lock.readLock().unlock(); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + this.lock.writeLock().lock(); + try { + if (this.state.equals(State.INITIALIZED)) { + this.state = State.CANCELLED; + this.latch.countDown(); + return true; + } + } + finally { + this.lock.writeLock().unlock(); + } + return false; + } + + @Override + public boolean isCancelled() { + this.lock.readLock().lock(); + try { + return this.state.equals(State.CANCELLED); + } + finally { + this.lock.readLock().unlock(); + } + } + + boolean setValue(T value) { + this.lock.writeLock().lock(); + try { + if (this.state.equals(State.INITIALIZED)) { + this.value = value; + this.state = State.COMPLETED; + this.latch.countDown(); + return true; + } + } + finally { + this.lock.writeLock().unlock(); + } + return false; + } + + Throwable getThrowable() { + return this.throwable; + } + + boolean setThrowable(Throwable throwable) { + this.lock.writeLock().lock(); + try { + if (this.state.equals(State.INITIALIZED)) { + this.throwable = throwable; + this.state = State.COMPLETED; + this.latch.countDown(); + return true; + } + } + finally { + this.lock.writeLock().unlock(); + } + return false; + } + + private enum State {INITIALIZED, COMPLETED, CANCELLED} + } +} diff --git a/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java b/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java new file mode 100644 index 0000000000..731bd14226 --- /dev/null +++ b/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java @@ -0,0 +1,338 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.*; + +/** + * @author Mattias Severson + */ +public class SettableListenableFutureTests { + + private SettableListenableFuture settableListenableFuture; + + @Before + public void setUp() { + settableListenableFuture = new SettableListenableFuture(); + } + + @Test + public void validateInitialValues() { + assertFalse(settableListenableFuture.isDone()); + assertFalse(settableListenableFuture.isCancelled()); + } + + @Test + public void returnsSetValue() throws ExecutionException, InterruptedException { + String string = "hello"; + boolean wasSet = settableListenableFuture.set(string); + assertTrue(wasSet); + assertThat(settableListenableFuture.get(), equalTo(string)); + } + + @Test + public void setValueUpdatesDoneStatus() { + settableListenableFuture.set("hello"); + assertTrue(settableListenableFuture.isDone()); + } + + @Test + public void throwsSetExceptionWrappedInExecutionException() throws ExecutionException, InterruptedException { + Throwable exception = new RuntimeException(); + boolean wasSet = settableListenableFuture.setException(exception); + assertTrue(wasSet); + try { + settableListenableFuture.get(); + fail("Expected ExecutionException"); + } + catch (ExecutionException ex) { + assertThat(ex.getCause(), equalTo(exception)); + } + } + + @Test + public void setValueTriggersCallback() { + String string = "hello"; + final String[] callbackHolder = new String[1]; + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + callbackHolder[0] = result; + } + + @Override + public void onFailure(Throwable t) { + fail("Expected onSuccess() to be called"); + } + }); + settableListenableFuture.set(string); + assertThat(callbackHolder[0], equalTo(string)); + } + + @Test + public void setValueTriggersCallbackOnlyOnce() { + String string = "hello"; + final String[] callbackHolder = new String[1]; + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + callbackHolder[0] = result; + } + + @Override + public void onFailure(Throwable t) { + fail("Expected onSuccess() to be called"); + } + }); + settableListenableFuture.set(string); + assertFalse(settableListenableFuture.set("good bye")); + assertThat(callbackHolder[0], equalTo(string)); + } + + @Test + public void setExceptionTriggersCallback() { + Throwable exception = new RuntimeException(); + final Throwable[] callbackHolder = new Throwable[1]; + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + fail("Expected onFailure() to be called"); + } + + @Override + public void onFailure(Throwable t) { + callbackHolder[0] = t; + } + }); + settableListenableFuture.setException(exception); + assertThat(callbackHolder[0], equalTo(exception)); + } + + @Test + public void setExceptionTriggersCallbackOnlyOnce() { + Throwable exception = new RuntimeException(); + final Throwable[] callbackHolder = new Throwable[1]; + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + fail("Expected onFailure() to be called"); + } + + @Override + public void onFailure(Throwable t) { + callbackHolder[0] = t; + } + }); + settableListenableFuture.setException(exception); + assertFalse(settableListenableFuture.setException(new IllegalArgumentException())); + assertThat(callbackHolder[0], equalTo(exception)); + } + + @Test + public void nullIsAcceptedAsValueToSet() throws ExecutionException, InterruptedException { + settableListenableFuture.set(null); + assertNull(settableListenableFuture.get()); + } + + @Test + public void getWaitsForCompletion() throws ExecutionException, InterruptedException { + final String string = "hello"; + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(20L); + settableListenableFuture.set(string); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + }).start(); + String value = settableListenableFuture.get(); + assertThat(value, equalTo(string)); + } + + @Test + public void getWithTimeoutThrowsTimeoutException() throws ExecutionException, InterruptedException { + try { + settableListenableFuture.get(1L, TimeUnit.MILLISECONDS); + fail("Expected TimeoutException"); + } catch (TimeoutException ex) { + // expected + } + } + + @Test + public void getWithTimeoutWaitsForCompletion() throws ExecutionException, InterruptedException, TimeoutException { + final String string = "hello"; + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(20L); + settableListenableFuture.set(string); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + }).start(); + String value = settableListenableFuture.get(100L, TimeUnit.MILLISECONDS); + assertThat(value, equalTo(string)); + } + + @Test + public void cancelPreventsValueFromBeingSet() { + boolean wasCancelled = settableListenableFuture.cancel(true); + assertTrue(wasCancelled); + boolean wasSet = settableListenableFuture.set("hello"); + assertFalse(wasSet); + } + + @Test + public void cancelSetsFutureToDone() { + settableListenableFuture.cancel(true); + assertTrue(settableListenableFuture.isDone()); + } + + @Test + public void cancelWithMayInterruptIfRunningTrueCallsOverridenMethod() { + InterruptableSettableListenableFuture tested = new InterruptableSettableListenableFuture(); + tested.cancel(true); + assertTrue(tested.calledInterruptTask()); + } + + @Test + public void cancelWithMayInterruptIfRunningFalseDoesNotCallOverridenMethod() { + InterruptableSettableListenableFuture tested = new InterruptableSettableListenableFuture(); + tested.cancel(false); + assertFalse(tested.calledInterruptTask()); + } + + @Test + public void setPreventsCancel() { + boolean wasSet = settableListenableFuture.set("hello"); + assertTrue(wasSet); + boolean wasCancelled = settableListenableFuture.cancel(true); + assertFalse(wasCancelled); + } + + @Test + public void cancelPreventsExceptionFromBeingSet() { + boolean wasCancelled = settableListenableFuture.cancel(true); + assertTrue(wasCancelled); + boolean wasSet = settableListenableFuture.setException(new RuntimeException()); + assertFalse(wasSet); + } + + @Test + public void setExceptionPreventsCancel() { + boolean wasSet = settableListenableFuture.setException(new RuntimeException()); + assertTrue(wasSet); + boolean wasCancelled = settableListenableFuture.cancel(true); + assertFalse(wasCancelled); + } + + @Test + public void cancelStateThrowsExceptionWhenCallingGet() throws ExecutionException, InterruptedException { + settableListenableFuture.cancel(true); + try { + settableListenableFuture.get(); + fail("Expected CancellationException"); + } + catch (CancellationException ex) { + // expected + } + } + + @Test + public void cancelStateThrowsExceptionWhenCallingGetWithTimeout() throws ExecutionException, TimeoutException, InterruptedException { + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(20L); + settableListenableFuture.cancel(true); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + }).start(); + try { + settableListenableFuture.get(100L, TimeUnit.MILLISECONDS); + fail("Expected CancellationException"); + } catch (CancellationException ex) { + // expected + } + } + + @Test + public void cancelDoesNotNotifyCallbacksOnSet() { + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + fail("onSuccess should not have been called"); + } + + @Override + public void onFailure(Throwable t) { + fail("onFailure should not have been called"); + } + }); + settableListenableFuture.cancel(true); + settableListenableFuture.set("hello"); + } + + @Test + public void cancelDoesNotNotifyCallbacksOnSetException() { + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + fail("onSuccess should not have been called"); + } + + @Override + public void onFailure(Throwable t) { + fail("onFailure should not have been called"); + } + }); + settableListenableFuture.cancel(true); + settableListenableFuture.setException(new RuntimeException()); + } + + private static class InterruptableSettableListenableFuture extends SettableListenableFuture { + + private boolean interrupted = false; + + @Override + protected void interruptTask() { + interrupted = true; + } + + boolean calledInterruptTask() { + return interrupted; + } + } +} From 0640a32863c9a86511e4a3e0c7fe75e98e0f3f1e Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Wed, 28 May 2014 16:03:10 -0400 Subject: [PATCH 2/2] Revise SettableListenableFuture implementation This change modifies the SettableListenableFuture implementation to use internally a ListenableFutureTask created with a "settable" Callable. Issue: SPR-11614 --- .../concurrent/SettableListenableFuture.java | 244 +++++------------- .../SettableListenableFutureTests.java | 38 ++- 2 files changed, 85 insertions(+), 197 deletions(-) diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java index 4e6f7498a1..7b37673302 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java @@ -18,44 +18,48 @@ package org.springframework.util.concurrent; import org.springframework.util.Assert; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.atomic.AtomicReference; /** - * A {@link ListenableFuture} whose value can be set by the {@link #set(Object)} or + * A {@link org.springframework.util.concurrent.ListenableFuture ListenableFuture} + * whose value can be set via {@link #set(Object)} or * {@link #setException(Throwable)}. It may also be cancelled. * *

Inspired by {@code com.google.common.util.concurrent.SettableFuture}. * * @author Mattias Severson + * @author Rossen Stoyanchev * @since 4.1 */ public class SettableListenableFuture implements ListenableFuture { - private final SettableFuture settableFuture = new SettableFuture(); - private final ListenableFutureCallbackRegistry registry = new ListenableFutureCallbackRegistry(); + private final SettableTask settableTask; + private final ListenableFutureTask listenableFuture; + + + public SettableListenableFuture() { + this.settableTask = new SettableTask(); + this.listenableFuture = new ListenableFutureTask(this.settableTask); + } /** * Set the value of this future. This method will return {@code true} if * the value was set successfully, or {@code false} if the future has already * been set or cancelled. - * * @param value the value that will be set. * @return {@code true} if the value was successfully set, else {@code false}. */ public boolean set(T value) { - boolean setValue = this.settableFuture.setValue(value); - if (setValue) { - this.registry.success(value); + boolean success = this.settableTask.setValue(value); + if (success) { + this.listenableFuture.run(); } - return setValue; + return success; } /** @@ -66,66 +70,66 @@ public class SettableListenableFuture implements ListenableFuture { * @return {@code true} if the exception was successfully set, else {@code false}. */ public boolean setException(Throwable exception) { - Assert.notNull(exception, "exception must not be null"); - boolean setException = this.settableFuture.setThrowable(exception); - if (setException) { - this.registry.failure(exception); + Assert.notNull(exception, "'exception' must not be null"); + boolean success = this.settableTask.setValue(exception); + if (success) { + this.listenableFuture.run(); } - return setException; + return success; } - @Override - public void addCallback(ListenableFutureCallback callback) { - this.registry.addCallback(callback); - } + @Override + public void addCallback(ListenableFutureCallback callback) { + this.listenableFuture.addCallback(callback); + } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - boolean cancelled = this.settableFuture.cancel(mayInterruptIfRunning); + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + this.settableTask.setCancelled(); + boolean cancelled = this.listenableFuture.cancel(mayInterruptIfRunning); if (cancelled && mayInterruptIfRunning) { interruptTask(); } return cancelled; - } + } - @Override - public boolean isCancelled() { - return this.settableFuture.isCancelled(); - } + @Override + public boolean isCancelled() { + return this.listenableFuture.isCancelled(); + } - @Override - public boolean isDone() { - return this.settableFuture.isDone(); - } + @Override + public boolean isDone() { + return this.listenableFuture.isDone(); + } /** * Retrieve the value. - *

Will return the value if it has been set by calling {@link #set(Object)}, throw - * an {@link ExecutionException} if the {@link #setException(Throwable)} has been - * called, throw a {@link CancellationException} if the future has been cancelled, or - * throw an {@link IllegalStateException} if neither a value, nor an exception has - * been set. + *

Will return the value if it has been set via {@link #set(Object)}, + * throw an {@link java.util.concurrent.ExecutionException} if it has been + * set via {@link #setException(Throwable)} or throw a + * {@link java.util.concurrent.CancellationException} if it has been cancelled. * @return The value associated with this future. */ - @Override - public T get() throws InterruptedException, ExecutionException { - return this.settableFuture.get(); - } + @Override + public T get() throws InterruptedException, ExecutionException { + return this.listenableFuture.get(); + } /** * Retrieve the value. - *

Will return the value if it has been by calling {@link #set(Object)}, throw an - * {@link ExecutionException} if the {@link #setException(Throwable)} - * has been called, throw a {@link java.util.concurrent.CancellationException} if the - * future has been cancelled. + *

Will return the value if it has been set via {@link #set(Object)}, + * throw an {@link java.util.concurrent.ExecutionException} if it has been + * set via {@link #setException(Throwable)} or throw a + * {@link java.util.concurrent.CancellationException} if it has been cancelled. * @param timeout the maximum time to wait. * @param unit the time unit of the timeout argument. * @return The value associated with this future. */ @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - return this.settableFuture.get(timeout, unit); - } + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return this.listenableFuture.get(timeout, unit); + } /** * Subclasses can override this method to implement interruption of the future's @@ -138,143 +142,33 @@ public class SettableListenableFuture implements ListenableFuture { } - /** - * Helper class that keeps track of the state of this future. - * @param The type of value to be set. - */ - private static class SettableFuture implements Future { + private static class SettableTask implements Callable { - private final ReadWriteLock lock = new ReentrantReadWriteLock(true); - private final CountDownLatch latch = new CountDownLatch(1); - private T value; - private Throwable throwable; - private State state = State.INITIALIZED; + private static final String NO_VALUE = SettableListenableFuture.class.getName() + ".NO_VALUE"; + private final AtomicReference value = new AtomicReference(NO_VALUE); - @Override - public T get() throws ExecutionException, InterruptedException { - this.latch.await(); - this.lock.readLock().lock(); - try { - return getValue(); - } - finally { - this.lock.readLock().unlock(); - } - } - - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - if (this.latch.await(timeout, unit)) { - this.lock.readLock().lock(); - try { - return getValue(); - } - finally { - this.lock.readLock().unlock(); - } - } - else { - throw new TimeoutException(); - } - } + private volatile boolean cancelled = false; - private T getValue() throws ExecutionException { - switch (this.state) { - case COMPLETED: - if (this.throwable != null) { - throw new ExecutionException(this.throwable); - } - else { - return this.value; - } - case CANCELLED: - throw new CancellationException("Future has been cancelled."); - default: - throw new IllegalStateException("Invalid state: " + this.state); - } - } - @Override - public boolean isDone() { - this.lock.readLock().lock(); - try { - switch (this.state) { - case COMPLETED: - case CANCELLED: - return true; - default: - return false; - } - } - finally { - this.lock.readLock().unlock(); + public boolean setValue(Object value) { + if (this.cancelled) { + return false; } + return this.value.compareAndSet(NO_VALUE, value); } - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - this.lock.writeLock().lock(); - try { - if (this.state.equals(State.INITIALIZED)) { - this.state = State.CANCELLED; - this.latch.countDown(); - return true; - } - } - finally { - this.lock.writeLock().unlock(); - } - return false; + public void setCancelled() { + this.cancelled = true; } @Override - public boolean isCancelled() { - this.lock.readLock().lock(); - try { - return this.state.equals(State.CANCELLED); - } - finally { - this.lock.readLock().unlock(); + public T call() throws Exception { + if (value.get() instanceof Exception) { + throw (Exception) value.get(); } + return (T) value.get(); } - - boolean setValue(T value) { - this.lock.writeLock().lock(); - try { - if (this.state.equals(State.INITIALIZED)) { - this.value = value; - this.state = State.COMPLETED; - this.latch.countDown(); - return true; - } - } - finally { - this.lock.writeLock().unlock(); - } - return false; - } - - Throwable getThrowable() { - return this.throwable; - } - - boolean setThrowable(Throwable throwable) { - this.lock.writeLock().lock(); - try { - if (this.state.equals(State.INITIALIZED)) { - this.throwable = throwable; - this.state = State.COMPLETED; - this.latch.countDown(); - return true; - } - } - finally { - this.lock.writeLock().unlock(); - } - return false; - } - - private enum State {INITIALIZED, COMPLETED, CANCELLED} } + } diff --git a/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java b/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java index 731bd14226..d16d08a77f 100644 --- a/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java +++ b/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java @@ -26,6 +26,8 @@ import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + /** * @author Mattias Severson @@ -290,36 +292,28 @@ public class SettableListenableFutureTests { @Test public void cancelDoesNotNotifyCallbacksOnSet() { - settableListenableFuture.addCallback(new ListenableFutureCallback() { - @Override - public void onSuccess(String result) { - fail("onSuccess should not have been called"); - } - - @Override - public void onFailure(Throwable t) { - fail("onFailure should not have been called"); - } - }); + ListenableFutureCallback callback = mock(ListenableFutureCallback.class); + settableListenableFuture.addCallback(callback); settableListenableFuture.cancel(true); + + verify(callback).onFailure(any(CancellationException.class)); + verifyNoMoreInteractions(callback); + settableListenableFuture.set("hello"); + verifyNoMoreInteractions(callback); } @Test public void cancelDoesNotNotifyCallbacksOnSetException() { - settableListenableFuture.addCallback(new ListenableFutureCallback() { - @Override - public void onSuccess(String result) { - fail("onSuccess should not have been called"); - } - - @Override - public void onFailure(Throwable t) { - fail("onFailure should not have been called"); - } - }); + ListenableFutureCallback callback = mock(ListenableFutureCallback.class); + settableListenableFuture.addCallback(callback); settableListenableFuture.cancel(true); + + verify(callback).onFailure(any(CancellationException.class)); + verifyNoMoreInteractions(callback); + settableListenableFuture.setException(new RuntimeException()); + verifyNoMoreInteractions(callback); } private static class InterruptableSettableListenableFuture extends SettableListenableFuture {