Browse Source

TaskDecorator callback supported by common TaskExecutor implementations

Issue: SPR-13930
pull/968/head
Juergen Hoeller 9 years ago
parent
commit
25be5e060c
  1. 21
      spring-context-support/src/main/java/org/springframework/scheduling/commonj/WorkManagerTaskExecutor.java
  2. 17
      spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java
  3. 47
      spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java
  4. 23
      spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java
  5. 45
      spring-core/src/main/java/org/springframework/core/task/TaskDecorator.java
  6. 50
      spring-core/src/main/java/org/springframework/core/task/support/TaskExecutorAdapter.java
  7. 21
      spring-tx/src/main/java/org/springframework/jca/work/WorkManagerTaskExecutor.java

21
spring-context-support/src/main/java/org/springframework/scheduling/commonj/WorkManagerTaskExecutor.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -31,6 +31,7 @@ import commonj.work.WorkRejectedException; @@ -31,6 +31,7 @@ import commonj.work.WorkRejectedException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.TaskDecorator;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.jndi.JndiLocatorSupport;
import org.springframework.scheduling.SchedulingException;
@ -71,6 +72,8 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport @@ -71,6 +72,8 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport
private WorkListener workListener;
private TaskDecorator taskDecorator;
/**
* Specify the CommonJ WorkManager to delegate to.
@ -101,6 +104,20 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport @@ -101,6 +104,20 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport
this.workListener = workListener;
}
/**
* Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
* about to be executed.
* <p>Note that such a decorator is not necessarily being applied to the
* user-supplied {@code Runnable}/{@code Callable} but rather to the actual
* execution callback (which may be a wrapper around the user-supplied task).
* <p>The primary use case is to set some execution context around the task's
* invocation, or to provide some monitoring/statistics for task execution.
* @since 4.3
*/
public void setTaskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
}
@Override
public void afterPropertiesSet() throws NamingException {
if (this.workManager == null) {
@ -119,7 +136,7 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport @@ -119,7 +136,7 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport
@Override
public void execute(Runnable task) {
Assert.state(this.workManager != null, "No WorkManager specified");
Work work = new DelegatingWork(task);
Work work = new DelegatingWork(this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
try {
if (this.workListener != null) {
this.workManager.schedule(work, this.workListener);

17
spring-context/src/main/java/org/springframework/scheduling/concurrent/ConcurrentTaskExecutor.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,6 +26,7 @@ import javax.enterprise.concurrent.ManagedExecutors; @@ -26,6 +26,7 @@ import javax.enterprise.concurrent.ManagedExecutors;
import javax.enterprise.concurrent.ManagedTask;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.TaskDecorator;
import org.springframework.core.task.support.TaskExecutorAdapter;
import org.springframework.scheduling.SchedulingAwareRunnable;
import org.springframework.scheduling.SchedulingTaskExecutor;
@ -127,6 +128,20 @@ public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, Sche @@ -127,6 +128,20 @@ public class ConcurrentTaskExecutor implements AsyncListenableTaskExecutor, Sche
return this.concurrentExecutor;
}
/**
* Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
* about to be executed.
* <p>Note that such a decorator is not necessarily being applied to the
* user-supplied {@code Runnable}/{@code Callable} but rather to the actual
* execution callback (which may be a wrapper around the user-supplied task).
* <p>The primary use case is to set some execution context around the task's
* invocation, or to provide some monitoring/statistics for task execution.
* @since 4.3
*/
public final void setTaskDecorator(TaskDecorator taskDecorator) {
this.adaptedExecutor.setTaskDecorator(taskDecorator);
}
@Override
public void execute(Runnable task) {

47
spring-context/src/main/java/org/springframework/scheduling/concurrent/ThreadPoolTaskExecutor.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,6 +30,7 @@ import java.util.concurrent.ThreadPoolExecutor; @@ -30,6 +30,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.TaskDecorator;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.scheduling.SchedulingTaskExecutor;
import org.springframework.util.Assert;
@ -82,6 +83,8 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport @@ -82,6 +83,8 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
private boolean allowCoreThreadTimeOut = false;
private TaskDecorator taskDecorator;
private ThreadPoolExecutor threadPoolExecutor;
@ -177,15 +180,51 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport @@ -177,15 +180,51 @@ public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport
this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
}
/**
* Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
* about to be executed.
* <p>Note that such a decorator is not necessarily being applied to the
* user-supplied {@code Runnable}/{@code Callable} but rather to the actual
* execution callback (which may be a wrapper around the user-supplied task).
* <p>The primary use case is to set some execution context around the task's
* invocation, or to provide some monitoring/statistics for task execution.
* @since 4.3
*/
public void setTaskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
}
/**
* Note: This method exposes an {@link ExecutorService} to its base class
* but stores the actual {@link ThreadPoolExecutor} handle internally.
* Do not override this method for replacing the executor, rather just for
* decorating its {@code ExecutorService} handle or storing custom state.
*/
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
super.execute(taskDecorator.decorate(command));
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}

23
spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -65,6 +65,8 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implement @@ -65,6 +65,8 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implement
private ThreadFactory threadFactory;
private TaskDecorator taskDecorator;
/**
* Create a new SimpleAsyncTaskExecutor with default thread name prefix.
@ -109,6 +111,20 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implement @@ -109,6 +111,20 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implement
return this.threadFactory;
}
/**
* Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
* about to be executed.
* <p>Note that such a decorator is not necessarily being applied to the
* user-supplied {@code Runnable}/{@code Callable} but rather to the actual
* execution callback (which may be a wrapper around the user-supplied task).
* <p>The primary use case is to set some execution context around the task's
* invocation, or to provide some monitoring/statistics for task execution.
* @since 4.3
*/
public final void setTaskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
}
/**
* Set the maximum number of parallel accesses allowed.
* -1 indicates no concurrency limit at all.
@ -163,12 +179,13 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implement @@ -163,12 +179,13 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implement
@Override
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(task));
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
}
else {
doExecute(task);
doExecute(taskToUse);
}
}

45
spring-core/src/main/java/org/springframework/core/task/TaskDecorator.java

@ -0,0 +1,45 @@ @@ -0,0 +1,45 @@
/*
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.core.task;
/**
* A callback interface for a decorator to be applied to any {@link Runnable}
* about to be executed.
*
* <p>Note that such a decorator is not necessarily being applied to the
* user-supplied {@code Runnable}/{@code Callable} but rather to the actual
* execution callback (which may be a wrapper around the user-supplied task).
*
* <p>The primary use case is to set some execution context around the task's
* invocation, or to provide some monitoring/statistics for task execution.
*
* @author Juergen Hoeller
* @since 4.3
* @see TaskExecutor#execute(Runnable)
* @see SimpleAsyncTaskExecutor#setTaskDecorator
*/
public interface TaskDecorator {
/**
* Decorate the given {@code Runnable}, returning a potentially wrapped
* {@code Runnable} for actual execution.
* @param runnable the original {@code Runnable}
* @return the decorated {@code Runnable}
*/
Runnable decorate(Runnable runnable);
}

50
spring-core/src/main/java/org/springframework/core/task/support/TaskExecutorAdapter.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -24,6 +24,7 @@ import java.util.concurrent.FutureTask; @@ -24,6 +24,7 @@ import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.TaskDecorator;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
@ -45,6 +46,8 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { @@ -45,6 +46,8 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
private final Executor concurrentExecutor;
private TaskDecorator taskDecorator;
/**
* Create a new TaskExecutorAdapter,
@ -57,6 +60,21 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { @@ -57,6 +60,21 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
}
/**
* Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
* about to be executed.
* <p>Note that such a decorator is not necessarily being applied to the
* user-supplied {@code Runnable}/{@code Callable} but rather to the actual
* execution callback (which may be a wrapper around the user-supplied task).
* <p>The primary use case is to set some execution context around the task's
* invocation, or to provide some monitoring/statistics for task execution.
* @since 4.3
*/
public final void setTaskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
}
/**
* Delegates to the specified JDK concurrent executor.
* @see java.util.concurrent.Executor#execute(Runnable)
@ -64,7 +82,7 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { @@ -64,7 +82,7 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
@Override
public void execute(Runnable task) {
try {
this.concurrentExecutor.execute(task);
doExecute(this.concurrentExecutor, this.taskDecorator, task);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException(
@ -80,12 +98,12 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { @@ -80,12 +98,12 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
@Override
public Future<?> submit(Runnable task) {
try {
if (this.concurrentExecutor instanceof ExecutorService) {
if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) {
return ((ExecutorService) this.concurrentExecutor).submit(task);
}
else {
FutureTask<Object> future = new FutureTask<Object>(task, null);
this.concurrentExecutor.execute(future);
doExecute(this.concurrentExecutor, this.taskDecorator, future);
return future;
}
}
@ -98,12 +116,12 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { @@ -98,12 +116,12 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
@Override
public <T> Future<T> submit(Callable<T> task) {
try {
if (this.concurrentExecutor instanceof ExecutorService) {
if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) {
return ((ExecutorService) this.concurrentExecutor).submit(task);
}
else {
FutureTask<T> future = new FutureTask<T>(task);
this.concurrentExecutor.execute(future);
doExecute(this.concurrentExecutor, this.taskDecorator, future);
return future;
}
}
@ -117,7 +135,7 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { @@ -117,7 +135,7 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
public ListenableFuture<?> submitListenable(Runnable task) {
try {
ListenableFutureTask<Object> future = new ListenableFutureTask<Object>(task, null);
this.concurrentExecutor.execute(future);
doExecute(this.concurrentExecutor, this.taskDecorator, future);
return future;
}
catch (RejectedExecutionException ex) {
@ -130,7 +148,7 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { @@ -130,7 +148,7 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
try {
ListenableFutureTask<T> future = new ListenableFutureTask<T>(task);
this.concurrentExecutor.execute(future);
doExecute(this.concurrentExecutor, this.taskDecorator, future);
return future;
}
catch (RejectedExecutionException ex) {
@ -139,4 +157,20 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor { @@ -139,4 +157,20 @@ public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
}
}
/**
* Actually execute the given {@code Runnable} (which may be a user-supplied task
* or a wrapper around a user-supplied task) with the given executor.
* @param concurrentExecutor the underlying JDK concurrent executor to delegate to
* @param taskDecorator the specified decorator to be applied, if any
* @param runnable the runnable to execute
* @throws RejectedExecutionException if the given runnable cannot be accepted
* @since 4.3
*/
protected void doExecute(Executor concurrentExecutor, TaskDecorator taskDecorator, Runnable runnable)
throws RejectedExecutionException{
concurrentExecutor.execute(taskDecorator != null ? taskDecorator.decorate(runnable) : runnable);
}
}

21
spring-tx/src/main/java/org/springframework/jca/work/WorkManagerTaskExecutor.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -30,6 +30,7 @@ import javax.resource.spi.work.WorkRejectedException; @@ -30,6 +30,7 @@ import javax.resource.spi.work.WorkRejectedException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.core.task.TaskDecorator;
import org.springframework.core.task.TaskRejectedException;
import org.springframework.core.task.TaskTimeoutException;
import org.springframework.jca.context.BootstrapContextAware;
@ -84,6 +85,8 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport @@ -84,6 +85,8 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport
private WorkListener workListener;
private TaskDecorator taskDecorator;
/**
* Create a new WorkManagerTaskExecutor, expecting bean-style configuration.
@ -164,6 +167,20 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport @@ -164,6 +167,20 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport
this.workListener = workListener;
}
/**
* Specify a custom {@link TaskDecorator} to be applied to any {@link Runnable}
* about to be executed.
* <p>Note that such a decorator is not necessarily being applied to the
* user-supplied {@code Runnable}/{@code Callable} but rather to the actual
* execution callback (which may be a wrapper around the user-supplied task).
* <p>The primary use case is to set some execution context around the task's
* invocation, or to provide some monitoring/statistics for task execution.
* @since 4.3
*/
public void setTaskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
}
@Override
public void afterPropertiesSet() throws NamingException {
if (this.workManager == null) {
@ -199,7 +216,7 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport @@ -199,7 +216,7 @@ public class WorkManagerTaskExecutor extends JndiLocatorSupport
@Override
public void execute(Runnable task, long startTimeout) {
Assert.state(this.workManager != null, "No WorkManager specified");
Work work = new DelegatingWork(task);
Work work = new DelegatingWork(this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
try {
if (this.blockUntilCompleted) {
if (startTimeout != TIMEOUT_INDEFINITE || this.workListener != null) {

Loading…
Cancel
Save