diff --git a/spring-context-support/src/main/java/org/springframework/scheduling/commonj/WorkManagerTaskExecutor.java b/spring-context-support/src/main/java/org/springframework/scheduling/commonj/WorkManagerTaskExecutor.java index 420329b011..3d153001df 100644 --- a/spring-context-support/src/main/java/org/springframework/scheduling/commonj/WorkManagerTaskExecutor.java +++ b/spring-context-support/src/main/java/org/springframework/scheduling/commonj/WorkManagerTaskExecutor.java @@ -20,7 +20,6 @@ import java.util.Collection; import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; - import javax.naming.NamingException; import commonj.work.Work; @@ -31,11 +30,14 @@ import commonj.work.WorkManager; import commonj.work.WorkRejectedException; import org.springframework.beans.factory.InitializingBean; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.core.task.TaskRejectedException; import org.springframework.jndi.JndiLocatorSupport; import org.springframework.scheduling.SchedulingException; import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.util.Assert; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureTask; /** * TaskExecutor implementation that delegates to a CommonJ WorkManager, @@ -61,7 +63,7 @@ import org.springframework.util.Assert; * @since 2.0 */ public class WorkManagerTaskExecutor extends JndiLocatorSupport - implements SchedulingTaskExecutor, WorkManager, InitializingBean { + implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, WorkManager, InitializingBean { private WorkManager workManager; @@ -153,6 +155,20 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport return future; } + @Override + public ListenableFuture submitListenable(Runnable task) { + ListenableFutureTask future = new ListenableFutureTask(task, null); + execute(future); + return future; + } + + @Override + public ListenableFuture submitListenable(Callable task) { + ListenableFutureTask future = new ListenableFutureTask(task); + execute(future); + return future; + } + /** * This task executor prefers short-lived work units. */ diff --git a/spring-context-support/src/main/java/org/springframework/scheduling/quartz/SimpleThreadPoolTaskExecutor.java b/spring-context-support/src/main/java/org/springframework/scheduling/quartz/SimpleThreadPoolTaskExecutor.java index 6f5374ab21..0e7b795252 100644 --- a/spring-context-support/src/main/java/org/springframework/scheduling/quartz/SimpleThreadPoolTaskExecutor.java +++ b/spring-context-support/src/main/java/org/springframework/scheduling/quartz/SimpleThreadPoolTaskExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,9 +25,12 @@ import org.quartz.simpl.SimpleThreadPool; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.scheduling.SchedulingException; import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.util.Assert; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureTask; /** * Subclass of Quartz's SimpleThreadPool that implements Spring's @@ -45,7 +48,7 @@ import org.springframework.util.Assert; * @see SchedulerFactoryBean#setTaskExecutor */ public class SimpleThreadPoolTaskExecutor extends SimpleThreadPool - implements SchedulingTaskExecutor, InitializingBean, DisposableBean { + implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, InitializingBean, DisposableBean { private boolean waitForJobsToCompleteOnShutdown = false; @@ -92,6 +95,20 @@ public class SimpleThreadPoolTaskExecutor extends SimpleThreadPool return future; } + @Override + public ListenableFuture submitListenable(Runnable task) { + ListenableFutureTask future = new ListenableFutureTask(task, null); + execute(future); + return future; + } + + @Override + public ListenableFuture submitListenable(Callable task) { + ListenableFutureTask future = new ListenableFutureTask(task); + execute(future); + return future; + } + /** * This task executor prefers short-lived work units. */ diff --git a/spring-context/src/main/java/org/springframework/scheduling/SchedulingTaskExecutor.java b/spring-context/src/main/java/org/springframework/scheduling/SchedulingTaskExecutor.java index deef622f85..1c97f7905f 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/SchedulingTaskExecutor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/SchedulingTaskExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,13 @@ import org.springframework.core.task.AsyncTaskExecutor; * {@link Runnable Runnables} that match the exposed preferences * of the {@code TaskExecutor} implementation in use. * + *

Note: {@link SchedulingTaskExecutor} implementations are encouraged to also + * implement the {@link org.springframework.core.task.AsyncListenableTaskExecutor} + * interface. This is not required due to the dependency on Spring 4.0's new + * {@link org.springframework.util.concurrent.ListenableFuture} interface, + * which would make it impossible for third-party executor implementations + * to remain compatible with both Spring 4.0 and Spring 3.x. + * * @author Juergen Hoeller * @since 2.0 * @see SchedulingAwareRunnable diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java index 4f0a0ec8eb..e9f2d25528 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java @@ -25,10 +25,12 @@ import java.util.concurrent.Future; import javax.enterprise.concurrent.ManagedExecutors; import javax.enterprise.concurrent.ManagedTask; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.core.task.support.TaskExecutorAdapter; import org.springframework.scheduling.SchedulingAwareRunnable; import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.util.ClassUtils; +import org.springframework.util.concurrent.ListenableFuture; /** * Adapter that takes a {@code java.util.concurrent.Executor} and exposes @@ -57,7 +59,7 @@ import org.springframework.util.ClassUtils; * @see DefaultManagedTaskExecutor * @see ThreadPoolTaskExecutor */ -public class ConcurrentTaskExecutor implements SchedulingTaskExecutor { +public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { private static Class managedExecutorServiceClass; @@ -146,6 +148,16 @@ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor { return this.adaptedExecutor.submit(task); } + @Override + public ListenableFuture submitListenable(Runnable task) { + return this.adaptedExecutor.submitListenable(task); + } + + @Override + public ListenableFuture submitListenable(Callable task) { + return this.adaptedExecutor.submitListenable(task); + } + /** * This task executor prefers short-lived work units. */ @@ -181,6 +193,16 @@ public class ConcurrentTaskExecutor implements SchedulingTaskExecutor { public Future submit(Callable task) { return super.submit(ManagedTaskBuilder.buildManagedTask(task, task.toString())); } + + @Override + public ListenableFuture submitListenable(Runnable task) { + return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString())); + } + + @Override + public ListenableFuture submitListenable(Callable task) { + return super.submitListenable(ManagedTaskBuilder.buildManagedTask(task, task.toString())); + } } diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java index d7d5946e10..9d25fe5dfd 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java @@ -29,9 +29,12 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.core.task.TaskRejectedException; import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.util.Assert; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureTask; /** * JavaBean that allows for configuring a {@link java.util.concurrent.ThreadPoolExecutor} @@ -64,7 +67,8 @@ import org.springframework.util.Assert; * @see ConcurrentTaskExecutor */ @SuppressWarnings("serial") -public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor { +public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport + implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { private final Object poolSizeMonitor = new Object(); @@ -281,6 +285,32 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport impleme } } + @Override + public ListenableFuture submitListenable(Runnable task) { + ExecutorService executor = getThreadPoolExecutor(); + try { + ListenableFutureTask future = new ListenableFutureTask(task, null); + executor.execute(future); + return future; + } + catch (RejectedExecutionException ex) { + throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); + } + } + + @Override + public ListenableFuture submitListenable(Callable task) { + ExecutorService executor = getThreadPoolExecutor(); + try { + ListenableFutureTask future = new ListenableFutureTask(task); + executor.execute(future); + return future; + } + catch (RejectedExecutionException ex) { + throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); + } + } + /** * This task executor prefers short-lived work units. */ diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java index e9cfddd75f..be5f525df7 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskScheduler.java @@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.core.task.TaskRejectedException; import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.scheduling.TaskScheduler; @@ -36,6 +37,8 @@ import org.springframework.scheduling.Trigger; import org.springframework.scheduling.support.TaskUtils; import org.springframework.util.Assert; import org.springframework.util.ErrorHandler; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureTask; /** * Implementation of Spring's {@link TaskScheduler} interface, wrapping @@ -50,7 +53,7 @@ import org.springframework.util.ErrorHandler; */ @SuppressWarnings("serial") public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport - implements TaskScheduler, SchedulingTaskExecutor { + implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, TaskScheduler { private volatile int poolSize = 1; @@ -190,10 +193,37 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport public Future submit(Callable task) { ExecutorService executor = getScheduledExecutor(); try { + Callable taskToUse = task; if (this.errorHandler != null) { - task = new DelegatingErrorHandlingCallable(task, this.errorHandler); + taskToUse = new DelegatingErrorHandlingCallable(task, this.errorHandler); } - return executor.submit(task); + return executor.submit(taskToUse); + } + catch (RejectedExecutionException ex) { + throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); + } + } + + @Override + public ListenableFuture submitListenable(Runnable task) { + ExecutorService executor = getScheduledExecutor(); + try { + ListenableFutureTask future = new ListenableFutureTask(task, null); + executor.execute(errorHandlingTask(future, false)); + return future; + } + catch (RejectedExecutionException ex) { + throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); + } + } + + @Override + public ListenableFuture submitListenable(Callable task) { + ExecutorService executor = getScheduledExecutor(); + try { + ListenableFutureTask future = new ListenableFutureTask(task); + executor.execute(errorHandlingTask(future, false)); + return future; } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); @@ -279,6 +309,7 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport } } + private Runnable errorHandlingTask(Runnable task, boolean isRepeatingTask) { return TaskUtils.decorateTaskWithErrorHandler(task, this.errorHandler, isRepeatingTask); } @@ -290,7 +321,7 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport private final ErrorHandler errorHandler; - DelegatingErrorHandlingCallable(Callable delegate, ErrorHandler errorHandler) { + public DelegatingErrorHandlingCallable(Callable delegate, ErrorHandler errorHandler) { this.delegate = delegate; this.errorHandler = errorHandler; } @@ -298,7 +329,7 @@ public class ThreadPoolTaskScheduler extends ExecutorConfigurationSupport @Override public V call() throws Exception { try { - return delegate.call(); + return this.delegate.call(); } catch (Throwable t) { this.errorHandler.handleError(t); diff --git a/spring-core/src/main/java/org/springframework/core/task/AsyncTaskExecutor.java b/spring-core/src/main/java/org/springframework/core/task/AsyncTaskExecutor.java index 0a68e74615..f650e1d597 100644 --- a/spring-core/src/main/java/org/springframework/core/task/AsyncTaskExecutor.java +++ b/spring-core/src/main/java/org/springframework/core/task/AsyncTaskExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,6 +67,7 @@ public interface AsyncTaskExecutor extends TaskExecutor { * @param task the {@code Runnable} to execute (never {@code null}) * @return a Future representing pending completion of the task * @throws TaskRejectedException if the given task was not accepted + * @since 3.0 */ Future submit(Runnable task); @@ -76,6 +77,7 @@ public interface AsyncTaskExecutor extends TaskExecutor { * @param task the {@code Callable} to execute (never {@code null}) * @return a Future representing pending completion of the task * @throws TaskRejectedException if the given task was not accepted + * @since 3.0 */ Future submit(Callable task); diff --git a/spring-core/src/main/java/org/springframework/core/task/support/TaskExecutorAdapter.java b/spring-core/src/main/java/org/springframework/core/task/support/TaskExecutorAdapter.java index 9247995065..fad9ae09d1 100644 --- a/spring-core/src/main/java/org/springframework/core/task/support/TaskExecutorAdapter.java +++ b/spring-core/src/main/java/org/springframework/core/task/support/TaskExecutorAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,12 +23,14 @@ import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; -import org.springframework.core.task.AsyncTaskExecutor; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.core.task.TaskRejectedException; import org.springframework.util.Assert; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureTask; /** - * Adapter that takes a JDK 1.5 {@code java.util.concurrent.Executor} and + * Adapter that takes a JDK {@code java.util.concurrent.Executor} and * exposes a Spring {@link org.springframework.core.task.TaskExecutor} for it. * Also detects an extended {@code java.util.concurrent.ExecutorService}, adapting * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly. @@ -39,15 +41,15 @@ import org.springframework.util.Assert; * @see java.util.concurrent.ExecutorService * @see java.util.concurrent.Executors */ -public class TaskExecutorAdapter implements AsyncTaskExecutor { +public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { - private Executor concurrentExecutor; + private final Executor concurrentExecutor; /** * Create a new TaskExecutorAdapter, - * using the given JDK 1.5 concurrent executor. - * @param concurrentExecutor the JDK 1.5 concurrent executor to delegate to + * using the given JDK concurrent executor. + * @param concurrentExecutor the JDK concurrent executor to delegate to */ public TaskExecutorAdapter(Executor concurrentExecutor) { Assert.notNull(concurrentExecutor, "Executor must not be null"); @@ -56,7 +58,7 @@ public class TaskExecutorAdapter implements AsyncTaskExecutor { /** - * Delegates to the specified JDK 1.5 concurrent executor. + * Delegates to the specified JDK concurrent executor. * @see java.util.concurrent.Executor#execute(Runnable) */ @Override @@ -111,4 +113,30 @@ public class TaskExecutorAdapter implements AsyncTaskExecutor { } } + @Override + public ListenableFuture submitListenable(Runnable task) { + try { + ListenableFutureTask future = new ListenableFutureTask(task, null); + this.concurrentExecutor.execute(future); + return future; + } + catch (RejectedExecutionException ex) { + throw new TaskRejectedException( + "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex); + } + } + + @Override + public ListenableFuture submitListenable(Callable task) { + try { + ListenableFutureTask future = new ListenableFutureTask(task); + this.concurrentExecutor.execute(future); + return future; + } + catch (RejectedExecutionException ex) { + throw new TaskRejectedException( + "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex); + } + } + } diff --git a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java index f5f5612cba..b61f885f3c 100644 --- a/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java +++ b/spring-core/src/main/java/org/springframework/util/concurrent/ListenableFutureTask.java @@ -26,15 +26,14 @@ import java.util.concurrent.FutureTask; * @author Arjen Poutsma * @since 4.0 */ -public class ListenableFutureTask extends FutureTask - implements ListenableFuture { +public class ListenableFutureTask extends FutureTask implements ListenableFuture { + + private final ListenableFutureCallbackRegistry callbacks = new ListenableFutureCallbackRegistry(); - private final ListenableFutureCallbackRegistry callbacks = - new ListenableFutureCallbackRegistry(); /** - * Creates a new {@code ListenableFutureTask} that will, upon running, execute the - * given {@link Callable}. + * Create a new {@code ListenableFutureTask} that will, upon running, + * execute the given {@link Callable}. * @param callable the callable task */ public ListenableFutureTask(Callable callable) { @@ -42,9 +41,9 @@ public class ListenableFutureTask extends FutureTask } /** - * Creates a {@code ListenableFutureTask} that will, upon running, execute the given - * {@link Runnable}, and arrange that {@link #get()} will return the given result on - * successful completion. + * Create a {@code ListenableFutureTask} that will, upon running, + * execute the given {@link Runnable}, and arrange that {@link #get()} + * will return the given result on successful completion. * @param runnable the runnable task * @param result the result to return on successful completion */ @@ -52,9 +51,10 @@ public class ListenableFutureTask extends FutureTask super(runnable, result); } + @Override public void addCallback(ListenableFutureCallback callback) { - callbacks.addCallback(callback); + this.callbacks.addCallback(callback); } @Override @@ -62,7 +62,7 @@ public class ListenableFutureTask extends FutureTask Throwable cause; try { T result = get(); - callbacks.success(result); + this.callbacks.success(result); return; } catch (InterruptedException ex) { @@ -75,9 +75,10 @@ public class ListenableFutureTask extends FutureTask cause = ex; } } - catch (Throwable t) { - cause = t; + catch (Throwable ex) { + cause = ex; } - callbacks.failure(cause); + this.callbacks.failure(cause); } + } diff --git a/spring-tx/src/main/java/org/springframework/jca/work/WorkManagerTaskExecutor.java b/spring-tx/src/main/java/org/springframework/jca/work/WorkManagerTaskExecutor.java index de8f4da2cf..a8391e3dea 100644 --- a/spring-tx/src/main/java/org/springframework/jca/work/WorkManagerTaskExecutor.java +++ b/spring-tx/src/main/java/org/springframework/jca/work/WorkManagerTaskExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2013 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import javax.resource.spi.work.WorkManager; import javax.resource.spi.work.WorkRejectedException; import org.springframework.beans.factory.InitializingBean; +import org.springframework.core.task.AsyncListenableTaskExecutor; import org.springframework.core.task.TaskRejectedException; import org.springframework.core.task.TaskTimeoutException; import org.springframework.jca.context.BootstrapContextAware; @@ -36,6 +37,8 @@ import org.springframework.jndi.JndiLocatorSupport; import org.springframework.scheduling.SchedulingException; import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.util.Assert; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureTask; /** * {@link org.springframework.core.task.TaskExecutor} implementation @@ -69,7 +72,7 @@ import org.springframework.util.Assert; * @see javax.resource.spi.work.WorkManager#scheduleWork */ public class WorkManagerTaskExecutor extends JndiLocatorSupport - implements SchedulingTaskExecutor, WorkManager, BootstrapContextAware, InitializingBean { + implements AsyncListenableTaskExecutor, SchedulingTaskExecutor, WorkManager, BootstrapContextAware, InitializingBean { private WorkManager workManager; @@ -250,6 +253,20 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport return future; } + @Override + public ListenableFuture submitListenable(Runnable task) { + ListenableFutureTask future = new ListenableFutureTask(task, null); + execute(future, TIMEOUT_INDEFINITE); + return future; + } + + @Override + public ListenableFuture submitListenable(Callable task) { + ListenableFutureTask future = new ListenableFutureTask(task); + execute(future, TIMEOUT_INDEFINITE); + return future; + } + /** * This task executor prefers short-lived work units. */