From 25be5e060c26ad67d023620d3776c5d35e2d3b91 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Wed, 17 Feb 2016 16:58:02 +0100 Subject: [PATCH] TaskDecorator callback supported by common TaskExecutor implementations Issue: SPR-13930 --- .../commonj/WorkManagerTaskExecutor.java | 21 +++++++- .../concurrent/ConcurrentTaskExecutor.java | 17 ++++++- .../concurrent/ThreadPoolTaskExecutor.java | 47 +++++++++++++++-- .../core/task/SimpleAsyncTaskExecutor.java | 23 +++++++-- .../core/task/TaskDecorator.java | 45 +++++++++++++++++ .../task/support/TaskExecutorAdapter.java | 50 ++++++++++++++++--- .../jca/work/WorkManagerTaskExecutor.java | 21 +++++++- 7 files changed, 204 insertions(+), 20 deletions(-) create mode 100644 spring-core/src/main/java/org/springframework/core/task/TaskDecorator.java diff --git a/spring-context-support/src/main/java/org/springframework/scheduling/commonj/WorkManagerTaskExecutor.java b/spring-context-support/src/main/java/org/springframework/scheduling/commonj/WorkManagerTaskExecutor.java index 3d153001df..f39d726292 100644 --- a/spring-context-support/src/main/java/org/springframework/scheduling/commonj/WorkManagerTaskExecutor.java +++ b/spring-context-support/src/main/java/org/springframework/scheduling/commonj/WorkManagerTaskExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ import commonj.work.WorkRejectedException; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.task.AsyncListenableTaskExecutor; +import org.springframework.core.task.TaskDecorator; import org.springframework.core.task.TaskRejectedException; import org.springframework.jndi.JndiLocatorSupport; import org.springframework.scheduling.SchedulingException; @@ -71,6 +72,8 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport private WorkListener workListener; + private TaskDecorator taskDecorator; + /** * Specify the CommonJ WorkManager to delegate to. @@ -101,6 +104,20 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport this.workListener = workListener; } + /** + * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} + * about to be executed. + *

Note that such a decorator is not necessarily being applied to the + * user-supplied {@code Runnable}/{@code Callable} but rather to the actual + * execution callback (which may be a wrapper around the user-supplied task). + *

The primary use case is to set some execution context around the task's + * invocation, or to provide some monitoring/statistics for task execution. + * @since 4.3 + */ + public void setTaskDecorator(TaskDecorator taskDecorator) { + this.taskDecorator = taskDecorator; + } + @Override public void afterPropertiesSet() throws NamingException { if (this.workManager == null) { @@ -119,7 +136,7 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport @Override public void execute(Runnable task) { Assert.state(this.workManager != null, "No WorkManager specified"); - Work work = new DelegatingWork(task); + Work work = new DelegatingWork(this.taskDecorator != null ? this.taskDecorator.decorate(task) : task); try { if (this.workListener != null) { this.workManager.schedule(work, this.workListener); diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java index e9f2d25528..b888092dd9 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,6 +26,7 @@ import javax.enterprise.concurrent.ManagedExecutors; import javax.enterprise.concurrent.ManagedTask; import org.springframework.core.task.AsyncListenableTaskExecutor; +import org.springframework.core.task.TaskDecorator; import org.springframework.core.task.support.TaskExecutorAdapter; import org.springframework.scheduling.SchedulingAwareRunnable; import org.springframework.scheduling.SchedulingTaskExecutor; @@ -127,6 +128,20 @@ public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, Sche return this.concurrentExecutor; } + /** + * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} + * about to be executed. + *

Note that such a decorator is not necessarily being applied to the + * user-supplied {@code Runnable}/{@code Callable} but rather to the actual + * execution callback (which may be a wrapper around the user-supplied task). + *

The primary use case is to set some execution context around the task's + * invocation, or to provide some monitoring/statistics for task execution. + * @since 4.3 + */ + public final void setTaskDecorator(TaskDecorator taskDecorator) { + this.adaptedExecutor.setTaskDecorator(taskDecorator); + } + @Override public void execute(Runnable task) { diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java index 6cd5ccf929..3391276b6c 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2014 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.springframework.core.task.AsyncListenableTaskExecutor; +import org.springframework.core.task.TaskDecorator; import org.springframework.core.task.TaskRejectedException; import org.springframework.scheduling.SchedulingTaskExecutor; import org.springframework.util.Assert; @@ -82,6 +83,8 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport private boolean allowCoreThreadTimeOut = false; + private TaskDecorator taskDecorator; + private ThreadPoolExecutor threadPoolExecutor; @@ -177,15 +180,51 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; } + /** + * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} + * about to be executed. + *

Note that such a decorator is not necessarily being applied to the + * user-supplied {@code Runnable}/{@code Callable} but rather to the actual + * execution callback (which may be a wrapper around the user-supplied task). + *

The primary use case is to set some execution context around the task's + * invocation, or to provide some monitoring/statistics for task execution. + * @since 4.3 + */ + public void setTaskDecorator(TaskDecorator taskDecorator) { + this.taskDecorator = taskDecorator; + } + + /** + * Note: This method exposes an {@link ExecutorService} to its base class + * but stores the actual {@link ThreadPoolExecutor} handle internally. + * Do not override this method for replacing the executor, rather just for + * decorating its {@code ExecutorService} handle or storing custom state. + */ @Override protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue queue = createQueue(this.queueCapacity); - ThreadPoolExecutor executor = new ThreadPoolExecutor( - this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, - queue, threadFactory, rejectedExecutionHandler); + + ThreadPoolExecutor executor; + if (this.taskDecorator != null) { + executor = new ThreadPoolExecutor( + this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, + queue, threadFactory, rejectedExecutionHandler) { + @Override + public void execute(Runnable command) { + super.execute(taskDecorator.decorate(command)); + } + }; + } + else { + executor = new ThreadPoolExecutor( + this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, + queue, threadFactory, rejectedExecutionHandler); + + } + if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } diff --git a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java index 05e759ab87..0483932ae7 100644 --- a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java +++ b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2012 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,6 +65,8 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implement private ThreadFactory threadFactory; + private TaskDecorator taskDecorator; + /** * Create a new SimpleAsyncTaskExecutor with default thread name prefix. @@ -109,6 +111,20 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implement return this.threadFactory; } + /** + * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} + * about to be executed. + *

Note that such a decorator is not necessarily being applied to the + * user-supplied {@code Runnable}/{@code Callable} but rather to the actual + * execution callback (which may be a wrapper around the user-supplied task). + *

The primary use case is to set some execution context around the task's + * invocation, or to provide some monitoring/statistics for task execution. + * @since 4.3 + */ + public final void setTaskDecorator(TaskDecorator taskDecorator) { + this.taskDecorator = taskDecorator; + } + /** * Set the maximum number of parallel accesses allowed. * -1 indicates no concurrency limit at all. @@ -163,12 +179,13 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implement @Override public void execute(Runnable task, long startTimeout) { Assert.notNull(task, "Runnable must not be null"); + Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task); if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) { this.concurrencyThrottle.beforeAccess(); - doExecute(new ConcurrencyThrottlingRunnable(task)); + doExecute(new ConcurrencyThrottlingRunnable(taskToUse)); } else { - doExecute(task); + doExecute(taskToUse); } } diff --git a/spring-core/src/main/java/org/springframework/core/task/TaskDecorator.java b/spring-core/src/main/java/org/springframework/core/task/TaskDecorator.java new file mode 100644 index 0000000000..d36a3ab391 --- /dev/null +++ b/spring-core/src/main/java/org/springframework/core/task/TaskDecorator.java @@ -0,0 +1,45 @@ +/* + * Copyright 2002-2016 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.core.task; + +/** + * A callback interface for a decorator to be applied to any {@link Runnable} + * about to be executed. + * + *

Note that such a decorator is not necessarily being applied to the + * user-supplied {@code Runnable}/{@code Callable} but rather to the actual + * execution callback (which may be a wrapper around the user-supplied task). + * + *

The primary use case is to set some execution context around the task's + * invocation, or to provide some monitoring/statistics for task execution. + * + * @author Juergen Hoeller + * @since 4.3 + * @see TaskExecutor#execute(Runnable) + * @see SimpleAsyncTaskExecutor#setTaskDecorator + */ +public interface TaskDecorator { + + /** + * Decorate the given {@code Runnable}, returning a potentially wrapped + * {@code Runnable} for actual execution. + * @param runnable the original {@code Runnable} + * @return the decorated {@code Runnable} + */ + Runnable decorate(Runnable runnable); + +} diff --git a/spring-core/src/main/java/org/springframework/core/task/support/TaskExecutorAdapter.java b/spring-core/src/main/java/org/springframework/core/task/support/TaskExecutorAdapter.java index fad9ae09d1..165533dfad 100644 --- a/spring-core/src/main/java/org/springframework/core/task/support/TaskExecutorAdapter.java +++ b/spring-core/src/main/java/org/springframework/core/task/support/TaskExecutorAdapter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionException; import org.springframework.core.task.AsyncListenableTaskExecutor; +import org.springframework.core.task.TaskDecorator; import org.springframework.core.task.TaskRejectedException; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; @@ -45,6 +46,8 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { private final Executor concurrentExecutor; + private TaskDecorator taskDecorator; + /** * Create a new TaskExecutorAdapter, @@ -57,6 +60,21 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { } + /** + * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} + * about to be executed. + *

Note that such a decorator is not necessarily being applied to the + * user-supplied {@code Runnable}/{@code Callable} but rather to the actual + * execution callback (which may be a wrapper around the user-supplied task). + *

The primary use case is to set some execution context around the task's + * invocation, or to provide some monitoring/statistics for task execution. + * @since 4.3 + */ + public final void setTaskDecorator(TaskDecorator taskDecorator) { + this.taskDecorator = taskDecorator; + } + + /** * Delegates to the specified JDK concurrent executor. * @see java.util.concurrent.Executor#execute(Runnable) @@ -64,7 +82,7 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { @Override public void execute(Runnable task) { try { - this.concurrentExecutor.execute(task); + doExecute(this.concurrentExecutor, this.taskDecorator, task); } catch (RejectedExecutionException ex) { throw new TaskRejectedException( @@ -80,12 +98,12 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { @Override public Future submit(Runnable task) { try { - if (this.concurrentExecutor instanceof ExecutorService) { + if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) { return ((ExecutorService) this.concurrentExecutor).submit(task); } else { FutureTask future = new FutureTask(task, null); - this.concurrentExecutor.execute(future); + doExecute(this.concurrentExecutor, this.taskDecorator, future); return future; } } @@ -98,12 +116,12 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { @Override public Future submit(Callable task) { try { - if (this.concurrentExecutor instanceof ExecutorService) { + if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) { return ((ExecutorService) this.concurrentExecutor).submit(task); } else { FutureTask future = new FutureTask(task); - this.concurrentExecutor.execute(future); + doExecute(this.concurrentExecutor, this.taskDecorator, future); return future; } } @@ -117,7 +135,7 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { public ListenableFuture submitListenable(Runnable task) { try { ListenableFutureTask future = new ListenableFutureTask(task, null); - this.concurrentExecutor.execute(future); + doExecute(this.concurrentExecutor, this.taskDecorator, future); return future; } catch (RejectedExecutionException ex) { @@ -130,7 +148,7 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { public ListenableFuture submitListenable(Callable task) { try { ListenableFutureTask future = new ListenableFutureTask(task); - this.concurrentExecutor.execute(future); + doExecute(this.concurrentExecutor, this.taskDecorator, future); return future; } catch (RejectedExecutionException ex) { @@ -139,4 +157,20 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { } } + + /** + * Actually execute the given {@code Runnable} (which may be a user-supplied task + * or a wrapper around a user-supplied task) with the given executor. + * @param concurrentExecutor the underlying JDK concurrent executor to delegate to + * @param taskDecorator the specified decorator to be applied, if any + * @param runnable the runnable to execute + * @throws RejectedExecutionException if the given runnable cannot be accepted + * @since 4.3 + */ + protected void doExecute(Executor concurrentExecutor, TaskDecorator taskDecorator, Runnable runnable) + throws RejectedExecutionException{ + + concurrentExecutor.execute(taskDecorator != null ? taskDecorator.decorate(runnable) : runnable); + } + } diff --git a/spring-tx/src/main/java/org/springframework/jca/work/WorkManagerTaskExecutor.java b/spring-tx/src/main/java/org/springframework/jca/work/WorkManagerTaskExecutor.java index a8391e3dea..532ea23e7c 100644 --- a/spring-tx/src/main/java/org/springframework/jca/work/WorkManagerTaskExecutor.java +++ b/spring-tx/src/main/java/org/springframework/jca/work/WorkManagerTaskExecutor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2016 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ import javax.resource.spi.work.WorkRejectedException; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.task.AsyncListenableTaskExecutor; +import org.springframework.core.task.TaskDecorator; import org.springframework.core.task.TaskRejectedException; import org.springframework.core.task.TaskTimeoutException; import org.springframework.jca.context.BootstrapContextAware; @@ -84,6 +85,8 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport private WorkListener workListener; + private TaskDecorator taskDecorator; + /** * Create a new WorkManagerTaskExecutor, expecting bean-style configuration. @@ -164,6 +167,20 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport this.workListener = workListener; } + /** + * Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable} + * about to be executed. + *

Note that such a decorator is not necessarily being applied to the + * user-supplied {@code Runnable}/{@code Callable} but rather to the actual + * execution callback (which may be a wrapper around the user-supplied task). + *

The primary use case is to set some execution context around the task's + * invocation, or to provide some monitoring/statistics for task execution. + * @since 4.3 + */ + public void setTaskDecorator(TaskDecorator taskDecorator) { + this.taskDecorator = taskDecorator; + } + @Override public void afterPropertiesSet() throws NamingException { if (this.workManager == null) { @@ -199,7 +216,7 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport @Override public void execute(Runnable task, long startTimeout) { Assert.state(this.workManager != null, "No WorkManager specified"); - Work work = new DelegatingWork(task); + Work work = new DelegatingWork(this.taskDecorator != null ? this.taskDecorator.decorate(task) : task); try { if (this.blockUntilCompleted) { if (startTimeout != TIMEOUT_INDEFINITE || this.workListener != null) {