Browse Source

Add option for graceful shutdown (setTaskTerminationTimeout)

See gh-30956
pull/29932/merge
Juergen Hoeller 1 year ago
parent
commit
ce80637891
  1. 34
      framework-docs/modules/ROOT/pages/integration/scheduling.adoc
  2. 20
      spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java
  3. 53
      spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java
  4. 139
      spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java

34
framework-docs/modules/ROOT/pages/integration/scheduling.adoc

@ -62,22 +62,27 @@ The variants that Spring provides are as follows: @@ -62,22 +62,27 @@ The variants that Spring provides are as follows:
`ConcurrentTaskExecutor` directly. However, if the `ThreadPoolTaskExecutor` is not
flexible enough for your needs, `ConcurrentTaskExecutor` is an alternative.
* `ThreadPoolTaskExecutor`:
This implementation is most commonly used. It exposes bean properties for
configuring a `java.util.concurrent.ThreadPoolExecutor` and wraps it in a `TaskExecutor`.
If you need to adapt to a different kind of `java.util.concurrent.Executor`, we
recommend that you use a `ConcurrentTaskExecutor` instead.
This implementation is most commonly used. It exposes bean properties for configuring
a `java.util.concurrent.ThreadPoolExecutor` and wraps it in a `TaskExecutor`.
If you need to adapt to a different kind of `java.util.concurrent.Executor`,
we recommend that you use a `ConcurrentTaskExecutor` instead.
* `DefaultManagedTaskExecutor`:
This implementation uses a JNDI-obtained `ManagedExecutorService` in a JSR-236
compatible runtime environment (such as a Jakarta EE application server),
replacing a CommonJ WorkManager for that purpose.
As of 6.1, `ThreadPoolTaskExecutor` provides a pause/resume capability and graceful
shutdown through Spring's lifecycle management. There is also a new "virtualThreads"
option on `SimpleAsyncTaskExecutor` which is aligned with JDK 21's Virtual Threads,
as well as a graceful shutdown capability for `SimpleAsyncTaskExecutor` as well.
[[scheduling-task-executor-usage]]
=== Using a `TaskExecutor`
Spring's `TaskExecutor` implementations are used as simple JavaBeans. In the following example,
we define a bean that uses the `ThreadPoolTaskExecutor` to asynchronously print
out a set of messages:
Spring's `TaskExecutor` implementations are commonly used with dependency injection.
In the following example, we define a bean that uses the `ThreadPoolTaskExecutor`
to asynchronously print out a set of messages:
[source,java,indent=0,subs="verbatim,quotes"]
----
@ -227,8 +232,8 @@ fixed delay, those methods should be used directly whenever possible. The value @@ -227,8 +232,8 @@ fixed delay, those methods should be used directly whenever possible. The value
`PeriodicTrigger` implementation is that you can use it within components that rely on
the `Trigger` abstraction. For example, it may be convenient to allow periodic triggers,
cron-based triggers, and even custom trigger implementations to be used interchangeably.
Such a component could take advantage of dependency injection so that you can configure such `Triggers`
externally and, therefore, easily modify or extend them.
Such a component could take advantage of dependency injection so that you can configure
such `Triggers` externally and, therefore, easily modify or extend them.
[[scheduling-task-scheduler-implementations]]
@ -238,10 +243,8 @@ As with Spring's `TaskExecutor` abstraction, the primary benefit of the `TaskSch @@ -238,10 +243,8 @@ As with Spring's `TaskExecutor` abstraction, the primary benefit of the `TaskSch
arrangement is that an application's scheduling needs are decoupled from the deployment
environment. This abstraction level is particularly relevant when deploying to an
application server environment where threads should not be created directly by the
application itself. For such scenarios, Spring provides a `TimerManagerTaskScheduler`
that delegates to a CommonJ `TimerManager` on WebLogic or WebSphere as well as a more recent
`DefaultManagedTaskScheduler` that delegates to a JSR-236 `ManagedScheduledExecutorService`
in a Jakarta EE environment. Both are typically configured with a JNDI lookup.
application itself. For such scenarios, Spring provides a `DefaultManagedTaskScheduler`
that delegates to a JSR-236 `ManagedScheduledExecutorService` in a Jakarta EE environment.
Whenever external thread management is not a requirement, a simpler alternative is
a local `ScheduledExecutorService` setup within the application, which can be adapted
@ -251,6 +254,11 @@ to provide common bean-style configuration along the lines of `ThreadPoolTaskExe @@ -251,6 +254,11 @@ to provide common bean-style configuration along the lines of `ThreadPoolTaskExe
These variants work perfectly fine for locally embedded thread pool setups in lenient
application server environments, as well -- in particular on Tomcat and Jetty.
As of 6.1, `ThreadPoolTaskScheduler` provides a pause/resume capability and graceful
shutdown through Spring's lifecycle management. There is also a new option called
`SimpleAsyncTaskScheduler` which is aligned with JDK 21's Virtual Threads, using a
single scheduler thread but firing up a new thread for every scheduled task execution.
[[scheduling-annotation-support]]

20
spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java

@ -44,7 +44,19 @@ import org.springframework.util.ErrorHandler; @@ -44,7 +44,19 @@ import org.springframework.util.ErrorHandler;
* A simple implementation of Spring's {@link TaskScheduler} interface, using
* a single scheduler thread and executing every scheduled task in an individual
* separate thread. This is an attractive choice with virtual threads on JDK 21,
* so it is commonly used with {@link #setVirtualThreads setVirtualThreads(true)}.
* expecting common usage with {@link #setVirtualThreads setVirtualThreads(true)}.
*
* <p>Supports a graceful shutdown through {@link #setTaskTerminationTimeout},
* at the expense of task tracking overhead per execution thread at runtime.
* Supports limiting concurrent threads through {@link #setConcurrencyLimit}.
* By default, the number of concurrent task executions is unlimited.
* This allows for dynamic concurrency of scheduled task executions, in contrast
* to {@link ThreadPoolTaskScheduler} which requires a fixed pool size.
*
* <p><b>NOTE: This implementation does not reuse threads!</b> Consider a
* thread-pooling TaskScheduler implementation instead, in particular for
* scheduling a large number of short-lived tasks. Alternatively, on JDK 21,
* consider setting {@link #setVirtualThreads} to {@code true}.
*
* <p>Extends {@link SimpleAsyncTaskExecutor} and can serve as a fully capable
* replacement for it, e.g. as a single shared instance serving as a
@ -64,13 +76,14 @@ import org.springframework.util.ErrorHandler; @@ -64,13 +76,14 @@ import org.springframework.util.ErrorHandler;
* @author Juergen Hoeller
* @since 6.1
* @see #setVirtualThreads
* @see #setTargetTaskExecutor
* @see #setTaskTerminationTimeout
* @see #setConcurrencyLimit
* @see SimpleAsyncTaskExecutor
* @see ThreadPoolTaskScheduler
*/
@SuppressWarnings("serial")
public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements TaskScheduler,
ApplicationContextAware, SmartLifecycle, ApplicationListener<ContextClosedEvent>, AutoCloseable {
ApplicationContextAware, SmartLifecycle, ApplicationListener<ContextClosedEvent> {
private static final TimeUnit NANO = TimeUnit.NANOSECONDS;
@ -275,6 +288,7 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements @@ -275,6 +288,7 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements
future.cancel(true);
}
}
super.close();
}
}

53
spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java

@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@ -33,6 +34,7 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext @@ -33,6 +34,7 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.core.task.TaskExecutor;
import org.springframework.core.testfixture.EnabledForTestGroups;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler;
@ -66,6 +68,10 @@ public class EnableSchedulingTests { @@ -66,6 +68,10 @@ public class EnableSchedulingTests {
}
/*
* Tests compatibility between default executor in TaskSchedulerRouter
* and explicit ThreadPoolTaskScheduler in configuration subclass.
*/
@ParameterizedTest
@ValueSource(classes = {FixedRateTaskConfig.class, FixedRateTaskConfigSubclass.class})
@EnabledForTestGroups(LONG_RUNNING)
@ -77,8 +83,14 @@ public class EnableSchedulingTests { @@ -77,8 +83,14 @@ public class EnableSchedulingTests {
assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThanOrEqualTo(10);
}
/*
* Tests compatibility between SimpleAsyncTaskScheduler in regular configuration
* and explicit ThreadPoolTaskScheduler in configuration subclass. This includes
* pause/resume behavior and a controlled shutdown with a 1s termination timeout.
*/
@ParameterizedTest
@ValueSource(classes = {ExplicitSchedulerConfig.class, ExplicitSchedulerConfigSubclass.class})
@Timeout(2) // should actually complete within 1s
@EnabledForTestGroups(LONG_RUNNING)
public void withExplicitScheduler(Class<?> configClass) throws InterruptedException {
ctx = new AnnotationConfigApplicationContext(configClass);
@ -96,9 +108,35 @@ public class EnableSchedulingTests { @@ -96,9 +108,35 @@ public class EnableSchedulingTests {
int count3 = ctx.getBean(AtomicInteger.class).get();
assertThat(count3).isGreaterThanOrEqualTo(20);
TaskExecutor executor = ctx.getBean(TaskExecutor.class);
AtomicInteger count = new AtomicInteger(0);
for (int i = 0; i < 2; i++) {
executor.execute(() -> {
try {
Thread.sleep(10000); // try to break test timeout
}
catch (InterruptedException ex) {
// expected during executor shutdown
try {
Thread.sleep(500);
// should get here within task termination timeout (1000)
count.incrementAndGet();
}
catch (InterruptedException ex2) {
// not expected
}
}
});
}
assertThat(ctx.getBean(ExplicitSchedulerConfig.class).threadName).startsWith("explicitScheduler-");
assertThat(Arrays.asList(ctx.getDefaultListableBeanFactory().getDependentBeans("myTaskScheduler")).contains(
TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)).isTrue();
assertThat(Arrays.asList(ctx.getDefaultListableBeanFactory().getDependentBeans("myTaskScheduler"))
.contains(TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)).isTrue();
// Include executor shutdown in test timeout (2 seconds),
// expecting interruption of the sleeping thread...
ctx.close();
assertThat(count.intValue()).isEqualTo(2);
}
@Test
@ -226,6 +264,11 @@ public class EnableSchedulingTests { @@ -226,6 +264,11 @@ public class EnableSchedulingTests {
@Configuration
static class FixedRateTaskConfigSubclass extends FixedRateTaskConfig {
@Bean
public TaskScheduler taskScheduler() {
return new ThreadPoolTaskScheduler();
}
}
@ -239,6 +282,7 @@ public class EnableSchedulingTests { @@ -239,6 +282,7 @@ public class EnableSchedulingTests {
public TaskScheduler myTaskScheduler() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler-");
scheduler.setTaskTerminationTimeout(1000);
return scheduler;
}
@ -263,6 +307,8 @@ public class EnableSchedulingTests { @@ -263,6 +307,8 @@ public class EnableSchedulingTests {
public TaskScheduler myTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler-");
scheduler.setAwaitTerminationMillis(1000);
scheduler.setPoolSize(2);
return scheduler;
}
}
@ -437,6 +483,7 @@ public class EnableSchedulingTests { @@ -437,6 +483,7 @@ public class EnableSchedulingTests {
public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1");
scheduler.setConcurrencyLimit(1);
return scheduler;
}
@ -478,6 +525,7 @@ public class EnableSchedulingTests { @@ -478,6 +525,7 @@ public class EnableSchedulingTests {
public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1-");
scheduler.setConcurrencyLimit(1);
return scheduler;
}
@ -508,6 +556,7 @@ public class EnableSchedulingTests { @@ -508,6 +556,7 @@ public class EnableSchedulingTests {
public TaskScheduler taskScheduler1() {
SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler();
scheduler.setThreadNamePrefix("explicitScheduler1-");
scheduler.setConcurrencyLimit(1);
return scheduler;
}

139
spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java

@ -17,7 +17,10 @@ @@ -17,7 +17,10 @@
package org.springframework.core.task;
import java.io.Serializable;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadFactory;
@ -31,10 +34,12 @@ import org.springframework.util.concurrent.ListenableFutureTask; @@ -31,10 +34,12 @@ import org.springframework.util.concurrent.ListenableFutureTask;
/**
* {@link TaskExecutor} implementation that fires up a new Thread for each task,
* executing it asynchronously. Supports a virtual thread option on JDK 21.
* executing it asynchronously. Provides a virtual thread option on JDK 21.
*
* <p>Supports limiting concurrent threads through the "concurrencyLimit"
* bean property. By default, the number of concurrent threads is unlimited.
* <p>Supports a graceful shutdown through {@link #setTaskTerminationTimeout},
* at the expense of task tracking overhead per execution thread at runtime.
* Supports limiting concurrent threads through {@link #setConcurrencyLimit}.
* By default, the number of concurrent task executions is unlimited.
*
* <p><b>NOTE: This implementation does not reuse threads!</b> Consider a
* thread-pooling TaskExecutor implementation instead, in particular for
@ -44,13 +49,14 @@ import org.springframework.util.concurrent.ListenableFutureTask; @@ -44,13 +49,14 @@ import org.springframework.util.concurrent.ListenableFutureTask;
* @author Juergen Hoeller
* @since 2.0
* @see #setVirtualThreads
* @see #setTaskTerminationTimeout
* @see #setConcurrencyLimit
* @see SyncTaskExecutor
* @see org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler
* @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
*/
@SuppressWarnings({"serial", "deprecation"})
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
implements AsyncListenableTaskExecutor, Serializable {
implements AsyncListenableTaskExecutor, Serializable, AutoCloseable {
/**
* Permit any number of concurrent invocations: that is, don't throttle concurrency.
@ -77,6 +83,13 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @@ -77,6 +83,13 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
@Nullable
private TaskDecorator taskDecorator;
private long taskTerminationTimeout;
@Nullable
private Set<Thread> activeThreads;
private volatile boolean active = true;
/**
* Create a new SimpleAsyncTaskExecutor with default thread name prefix.
@ -147,33 +160,62 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @@ -147,33 +160,62 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
* have to cast it and call {@code Future#get} to evaluate exceptions.
* @since 4.3
*/
public final void setTaskDecorator(TaskDecorator taskDecorator) {
public void setTaskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
}
/**
* Set the maximum number of parallel accesses allowed.
* -1 indicates no concurrency limit at all.
* <p>In principle, this limit can be changed at runtime,
* although it is generally designed as a config time setting.
* NOTE: Do not switch between -1 and any concrete limit at runtime,
* as this will lead to inconsistent concurrency counts: A limit
* of -1 effectively turns off concurrency counting completely.
* Specify a timeout for task termination when closing this executor.
* The default is 0, not waiting for task termination at all.
* <p>Note that a concrete >0 timeout specified here will lead to the
* wrapping of every submitted task into a task-tracking runnable which
* involves considerable overhead in case of a high number of tasks.
* However, for a modest level of submissions with longer-running
* tasks, this is feasible in order to arrive at a graceful shutdown.
* @param timeout the timeout in milliseconds
* @since 6.1
* @see #close()
* @see org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#setAwaitTerminationMillis
*/
public void setTaskTerminationTimeout(long timeout) {
Assert.isTrue(timeout >= 0, "Timeout value must be >=0");
this.taskTerminationTimeout = timeout;
this.activeThreads = (timeout > 0 ? Collections.newSetFromMap(new ConcurrentHashMap<>()) : null);
}
/**
* Return whether this executor is still active, i.e. not closed yet,
* and therefore accepts further task submissions. Otherwise, it is
* either in the task termination phase or entirely shut down already.
* @since 6.1
* @see #setTaskTerminationTimeout
* @see #close()
*/
public boolean isActive() {
return this.active;
}
/**
* Set the maximum number of parallel task executions allowed.
* The default of -1 indicates no concurrency limit at all.
* <p>This is the equivalent of a maximum pool size in a thread pool,
* preventing temporary overload of the thread management system.
* @see #UNBOUNDED_CONCURRENCY
* @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor#setMaxPoolSize
*/
public void setConcurrencyLimit(int concurrencyLimit) {
this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
}
/**
* Return the maximum number of parallel accesses allowed.
* Return the maximum number of parallel task executions allowed.
*/
public final int getConcurrencyLimit() {
return this.concurrencyThrottle.getConcurrencyLimit();
}
/**
* Return whether this throttle is currently active.
* Return whether the concurrency throttle is currently active.
* @return {@code true} if the concurrency limit for this instance is active
* @see #getConcurrencyLimit()
* @see #setConcurrencyLimit
@ -207,10 +249,17 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @@ -207,10 +249,17 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
@Override
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
if (!isActive()) {
throw new TaskRejectedException(getClass().getSimpleName() + " has been closed already");
}
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
doExecute(new TaskTrackingRunnable(taskToUse));
}
else if (this.activeThreads != null) {
doExecute(new TaskTrackingRunnable(taskToUse));
}
else {
doExecute(taskToUse);
@ -278,6 +327,33 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @@ -278,6 +327,33 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
}
}
/**
* This close methods tracks the termination of active threads if a concrete
* {@link #setTaskTerminationTimeout task termination timeout} has been set.
* Otherwise, it is not necessary to close this executor.
* @since 6.1
*/
@Override
public void close() {
if (this.active) {
this.active = false;
Set<Thread> threads = this.activeThreads;
if (threads != null) {
threads.forEach(Thread::interrupt);
synchronized (threads) {
try {
if (!threads.isEmpty()) {
threads.wait(this.taskTerminationTimeout);
}
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}
}
}
/**
* Subclass of the general ConcurrencyThrottleSupport class,
@ -299,23 +375,40 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @@ -299,23 +375,40 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
/**
* This Runnable calls {@code afterAccess()} after the
* target Runnable has finished its execution.
* Decorates a target task with active thread tracking
* and concurrency throttle management, if necessary.
*/
private class ConcurrencyThrottlingRunnable implements Runnable {
private class TaskTrackingRunnable implements Runnable {
private final Runnable target;
private final Runnable task;
public ConcurrencyThrottlingRunnable(Runnable target) {
this.target = target;
public TaskTrackingRunnable(Runnable task) {
Assert.notNull(task, "Task must not be null");
this.task = task;
}
@Override
public void run() {
Set<Thread> threads = activeThreads;
Thread thread = null;
if (threads != null) {
thread = Thread.currentThread();
threads.add(thread);
}
try {
this.target.run();
this.task.run();
}
finally {
if (threads != null) {
threads.remove(thread);
if (!isActive()) {
synchronized (threads) {
if (threads.isEmpty()) {
threads.notify();
}
}
}
}
concurrencyThrottle.afterAccess();
}
}

Loading…
Cancel
Save