From 38b525aa9e1fec1fbd611070c5abea24e378bf7d Mon Sep 17 00:00:00 2001 From: Mattias Severson Date: Mon, 31 Mar 2014 19:33:16 +0200 Subject: [PATCH] Created SettableListenableFuture with tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A SettableListenableFuture implementation of Spring's ListenableFuture The class is inspired by Google Guava’s com.google.common.util.concurrent.SettableFuture, but this implementation uses ReentrantReadWriteLock and CountDownLatch internally to handle thread synchronization. Issue: SPR-11614 --- .../concurrent/SettableListenableFuture.java | 280 +++++++++++++++ .../SettableListenableFutureTests.java | 338 ++++++++++++++++++ 2 files changed, 618 insertions(+) create mode 100644 spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java create mode 100644 spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java new file mode 100644 index 0000000000..4e6f7498a1 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/util/concurrent/SettableListenableFuture.java @@ -0,0 +1,280 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +import org.springframework.util.Assert; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * A {@link ListenableFuture} whose value can be set by the {@link #set(Object)} or + * {@link #setException(Throwable)}. It may also be cancelled. + * + *

Inspired by {@code com.google.common.util.concurrent.SettableFuture}. + * + * @author Mattias Severson + * @since 4.1 + */ +public class SettableListenableFuture implements ListenableFuture { + + private final SettableFuture settableFuture = new SettableFuture(); + private final ListenableFutureCallbackRegistry registry = new ListenableFutureCallbackRegistry(); + + + /** + * Set the value of this future. This method will return {@code true} if + * the value was set successfully, or {@code false} if the future has already + * been set or cancelled. + * + * @param value the value that will be set. + * @return {@code true} if the value was successfully set, else {@code false}. + */ + public boolean set(T value) { + boolean setValue = this.settableFuture.setValue(value); + if (setValue) { + this.registry.success(value); + } + return setValue; + } + + /** + * Set the exception of this future. This method will return {@code true} if + * the exception was set successfully, or {@code false} if the future has already + * been set or cancelled. + * @param exception the value that will be set. + * @return {@code true} if the exception was successfully set, else {@code false}. + */ + public boolean setException(Throwable exception) { + Assert.notNull(exception, "exception must not be null"); + boolean setException = this.settableFuture.setThrowable(exception); + if (setException) { + this.registry.failure(exception); + } + return setException; + } + + @Override + public void addCallback(ListenableFutureCallback callback) { + this.registry.addCallback(callback); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + boolean cancelled = this.settableFuture.cancel(mayInterruptIfRunning); + if (cancelled && mayInterruptIfRunning) { + interruptTask(); + } + return cancelled; + } + + @Override + public boolean isCancelled() { + return this.settableFuture.isCancelled(); + } + + @Override + public boolean isDone() { + return this.settableFuture.isDone(); + } + + /** + * Retrieve the value. + *

Will return the value if it has been set by calling {@link #set(Object)}, throw + * an {@link ExecutionException} if the {@link #setException(Throwable)} has been + * called, throw a {@link CancellationException} if the future has been cancelled, or + * throw an {@link IllegalStateException} if neither a value, nor an exception has + * been set. + * @return The value associated with this future. + */ + @Override + public T get() throws InterruptedException, ExecutionException { + return this.settableFuture.get(); + } + + /** + * Retrieve the value. + *

Will return the value if it has been by calling {@link #set(Object)}, throw an + * {@link ExecutionException} if the {@link #setException(Throwable)} + * has been called, throw a {@link java.util.concurrent.CancellationException} if the + * future has been cancelled. + * @param timeout the maximum time to wait. + * @param unit the time unit of the timeout argument. + * @return The value associated with this future. + */ + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return this.settableFuture.get(timeout, unit); + } + + /** + * Subclasses can override this method to implement interruption of the future's + * computation. The method is invoked automatically by a successful call to + * {@link #cancel(boolean) cancel(true)}. + * + *

The default implementation does nothing. + */ + protected void interruptTask() { + } + + + /** + * Helper class that keeps track of the state of this future. + * @param The type of value to be set. + */ + private static class SettableFuture implements Future { + + private final ReadWriteLock lock = new ReentrantReadWriteLock(true); + private final CountDownLatch latch = new CountDownLatch(1); + private T value; + private Throwable throwable; + private State state = State.INITIALIZED; + + + @Override + public T get() throws ExecutionException, InterruptedException { + this.latch.await(); + this.lock.readLock().lock(); + try { + return getValue(); + } + finally { + this.lock.readLock().unlock(); + } + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (this.latch.await(timeout, unit)) { + this.lock.readLock().lock(); + try { + return getValue(); + } + finally { + this.lock.readLock().unlock(); + } + } + else { + throw new TimeoutException(); + } + } + + private T getValue() throws ExecutionException { + switch (this.state) { + case COMPLETED: + if (this.throwable != null) { + throw new ExecutionException(this.throwable); + } + else { + return this.value; + } + case CANCELLED: + throw new CancellationException("Future has been cancelled."); + default: + throw new IllegalStateException("Invalid state: " + this.state); + } + } + + @Override + public boolean isDone() { + this.lock.readLock().lock(); + try { + switch (this.state) { + case COMPLETED: + case CANCELLED: + return true; + default: + return false; + } + } + finally { + this.lock.readLock().unlock(); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + this.lock.writeLock().lock(); + try { + if (this.state.equals(State.INITIALIZED)) { + this.state = State.CANCELLED; + this.latch.countDown(); + return true; + } + } + finally { + this.lock.writeLock().unlock(); + } + return false; + } + + @Override + public boolean isCancelled() { + this.lock.readLock().lock(); + try { + return this.state.equals(State.CANCELLED); + } + finally { + this.lock.readLock().unlock(); + } + } + + boolean setValue(T value) { + this.lock.writeLock().lock(); + try { + if (this.state.equals(State.INITIALIZED)) { + this.value = value; + this.state = State.COMPLETED; + this.latch.countDown(); + return true; + } + } + finally { + this.lock.writeLock().unlock(); + } + return false; + } + + Throwable getThrowable() { + return this.throwable; + } + + boolean setThrowable(Throwable throwable) { + this.lock.writeLock().lock(); + try { + if (this.state.equals(State.INITIALIZED)) { + this.throwable = throwable; + this.state = State.COMPLETED; + this.latch.countDown(); + return true; + } + } + finally { + this.lock.writeLock().unlock(); + } + return false; + } + + private enum State {INITIALIZED, COMPLETED, CANCELLED} + } +} diff --git a/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java b/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java new file mode 100644 index 0000000000..731bd14226 --- /dev/null +++ b/spring-core/src/test/java/org/springframework/util/concurrent/SettableListenableFutureTests.java @@ -0,0 +1,338 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.util.concurrent; + +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.*; + +/** + * @author Mattias Severson + */ +public class SettableListenableFutureTests { + + private SettableListenableFuture settableListenableFuture; + + @Before + public void setUp() { + settableListenableFuture = new SettableListenableFuture(); + } + + @Test + public void validateInitialValues() { + assertFalse(settableListenableFuture.isDone()); + assertFalse(settableListenableFuture.isCancelled()); + } + + @Test + public void returnsSetValue() throws ExecutionException, InterruptedException { + String string = "hello"; + boolean wasSet = settableListenableFuture.set(string); + assertTrue(wasSet); + assertThat(settableListenableFuture.get(), equalTo(string)); + } + + @Test + public void setValueUpdatesDoneStatus() { + settableListenableFuture.set("hello"); + assertTrue(settableListenableFuture.isDone()); + } + + @Test + public void throwsSetExceptionWrappedInExecutionException() throws ExecutionException, InterruptedException { + Throwable exception = new RuntimeException(); + boolean wasSet = settableListenableFuture.setException(exception); + assertTrue(wasSet); + try { + settableListenableFuture.get(); + fail("Expected ExecutionException"); + } + catch (ExecutionException ex) { + assertThat(ex.getCause(), equalTo(exception)); + } + } + + @Test + public void setValueTriggersCallback() { + String string = "hello"; + final String[] callbackHolder = new String[1]; + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + callbackHolder[0] = result; + } + + @Override + public void onFailure(Throwable t) { + fail("Expected onSuccess() to be called"); + } + }); + settableListenableFuture.set(string); + assertThat(callbackHolder[0], equalTo(string)); + } + + @Test + public void setValueTriggersCallbackOnlyOnce() { + String string = "hello"; + final String[] callbackHolder = new String[1]; + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + callbackHolder[0] = result; + } + + @Override + public void onFailure(Throwable t) { + fail("Expected onSuccess() to be called"); + } + }); + settableListenableFuture.set(string); + assertFalse(settableListenableFuture.set("good bye")); + assertThat(callbackHolder[0], equalTo(string)); + } + + @Test + public void setExceptionTriggersCallback() { + Throwable exception = new RuntimeException(); + final Throwable[] callbackHolder = new Throwable[1]; + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + fail("Expected onFailure() to be called"); + } + + @Override + public void onFailure(Throwable t) { + callbackHolder[0] = t; + } + }); + settableListenableFuture.setException(exception); + assertThat(callbackHolder[0], equalTo(exception)); + } + + @Test + public void setExceptionTriggersCallbackOnlyOnce() { + Throwable exception = new RuntimeException(); + final Throwable[] callbackHolder = new Throwable[1]; + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + fail("Expected onFailure() to be called"); + } + + @Override + public void onFailure(Throwable t) { + callbackHolder[0] = t; + } + }); + settableListenableFuture.setException(exception); + assertFalse(settableListenableFuture.setException(new IllegalArgumentException())); + assertThat(callbackHolder[0], equalTo(exception)); + } + + @Test + public void nullIsAcceptedAsValueToSet() throws ExecutionException, InterruptedException { + settableListenableFuture.set(null); + assertNull(settableListenableFuture.get()); + } + + @Test + public void getWaitsForCompletion() throws ExecutionException, InterruptedException { + final String string = "hello"; + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(20L); + settableListenableFuture.set(string); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + }).start(); + String value = settableListenableFuture.get(); + assertThat(value, equalTo(string)); + } + + @Test + public void getWithTimeoutThrowsTimeoutException() throws ExecutionException, InterruptedException { + try { + settableListenableFuture.get(1L, TimeUnit.MILLISECONDS); + fail("Expected TimeoutException"); + } catch (TimeoutException ex) { + // expected + } + } + + @Test + public void getWithTimeoutWaitsForCompletion() throws ExecutionException, InterruptedException, TimeoutException { + final String string = "hello"; + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(20L); + settableListenableFuture.set(string); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + }).start(); + String value = settableListenableFuture.get(100L, TimeUnit.MILLISECONDS); + assertThat(value, equalTo(string)); + } + + @Test + public void cancelPreventsValueFromBeingSet() { + boolean wasCancelled = settableListenableFuture.cancel(true); + assertTrue(wasCancelled); + boolean wasSet = settableListenableFuture.set("hello"); + assertFalse(wasSet); + } + + @Test + public void cancelSetsFutureToDone() { + settableListenableFuture.cancel(true); + assertTrue(settableListenableFuture.isDone()); + } + + @Test + public void cancelWithMayInterruptIfRunningTrueCallsOverridenMethod() { + InterruptableSettableListenableFuture tested = new InterruptableSettableListenableFuture(); + tested.cancel(true); + assertTrue(tested.calledInterruptTask()); + } + + @Test + public void cancelWithMayInterruptIfRunningFalseDoesNotCallOverridenMethod() { + InterruptableSettableListenableFuture tested = new InterruptableSettableListenableFuture(); + tested.cancel(false); + assertFalse(tested.calledInterruptTask()); + } + + @Test + public void setPreventsCancel() { + boolean wasSet = settableListenableFuture.set("hello"); + assertTrue(wasSet); + boolean wasCancelled = settableListenableFuture.cancel(true); + assertFalse(wasCancelled); + } + + @Test + public void cancelPreventsExceptionFromBeingSet() { + boolean wasCancelled = settableListenableFuture.cancel(true); + assertTrue(wasCancelled); + boolean wasSet = settableListenableFuture.setException(new RuntimeException()); + assertFalse(wasSet); + } + + @Test + public void setExceptionPreventsCancel() { + boolean wasSet = settableListenableFuture.setException(new RuntimeException()); + assertTrue(wasSet); + boolean wasCancelled = settableListenableFuture.cancel(true); + assertFalse(wasCancelled); + } + + @Test + public void cancelStateThrowsExceptionWhenCallingGet() throws ExecutionException, InterruptedException { + settableListenableFuture.cancel(true); + try { + settableListenableFuture.get(); + fail("Expected CancellationException"); + } + catch (CancellationException ex) { + // expected + } + } + + @Test + public void cancelStateThrowsExceptionWhenCallingGetWithTimeout() throws ExecutionException, TimeoutException, InterruptedException { + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(20L); + settableListenableFuture.cancel(true); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + }).start(); + try { + settableListenableFuture.get(100L, TimeUnit.MILLISECONDS); + fail("Expected CancellationException"); + } catch (CancellationException ex) { + // expected + } + } + + @Test + public void cancelDoesNotNotifyCallbacksOnSet() { + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + fail("onSuccess should not have been called"); + } + + @Override + public void onFailure(Throwable t) { + fail("onFailure should not have been called"); + } + }); + settableListenableFuture.cancel(true); + settableListenableFuture.set("hello"); + } + + @Test + public void cancelDoesNotNotifyCallbacksOnSetException() { + settableListenableFuture.addCallback(new ListenableFutureCallback() { + @Override + public void onSuccess(String result) { + fail("onSuccess should not have been called"); + } + + @Override + public void onFailure(Throwable t) { + fail("onFailure should not have been called"); + } + }); + settableListenableFuture.cancel(true); + settableListenableFuture.setException(new RuntimeException()); + } + + private static class InterruptableSettableListenableFuture extends SettableListenableFuture { + + private boolean interrupted = false; + + @Override + protected void interruptTask() { + interrupted = true; + } + + boolean calledInterruptTask() { + return interrupted; + } + } +}