From ce80637891148bb9a23b7f2b6626f99a98564d35 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Thu, 27 Jul 2023 21:39:58 +0200 Subject: [PATCH] Add option for graceful shutdown (setTaskTerminationTimeout) See gh-30956 --- .../ROOT/pages/integration/scheduling.adoc | 34 +++-- .../concurrent/SimpleAsyncTaskScheduler.java | 20 ++- .../annotation/EnableSchedulingTests.java | 53 ++++++- .../core/task/SimpleAsyncTaskExecutor.java | 139 +++++++++++++++--- 4 files changed, 205 insertions(+), 41 deletions(-) diff --git a/framework-docs/modules/ROOT/pages/integration/scheduling.adoc b/framework-docs/modules/ROOT/pages/integration/scheduling.adoc index 415e043920..c6e7af27a1 100644 --- a/framework-docs/modules/ROOT/pages/integration/scheduling.adoc +++ b/framework-docs/modules/ROOT/pages/integration/scheduling.adoc @@ -62,22 +62,27 @@ The variants that Spring provides are as follows: `ConcurrentTaskExecutor` directly. However, if the `ThreadPoolTaskExecutor` is not flexible enough for your needs, `ConcurrentTaskExecutor` is an alternative. * `ThreadPoolTaskExecutor`: - This implementation is most commonly used. It exposes bean properties for - configuring a `java.util.concurrent.ThreadPoolExecutor` and wraps it in a `TaskExecutor`. - If you need to adapt to a different kind of `java.util.concurrent.Executor`, we - recommend that you use a `ConcurrentTaskExecutor` instead. + This implementation is most commonly used. It exposes bean properties for configuring + a `java.util.concurrent.ThreadPoolExecutor` and wraps it in a `TaskExecutor`. + If you need to adapt to a different kind of `java.util.concurrent.Executor`, + we recommend that you use a `ConcurrentTaskExecutor` instead. * `DefaultManagedTaskExecutor`: This implementation uses a JNDI-obtained `ManagedExecutorService` in a JSR-236 compatible runtime environment (such as a Jakarta EE application server), replacing a CommonJ WorkManager for that purpose. +As of 6.1, `ThreadPoolTaskExecutor` provides a pause/resume capability and graceful +shutdown through Spring's lifecycle management. There is also a new "virtualThreads" +option on `SimpleAsyncTaskExecutor` which is aligned with JDK 21's Virtual Threads, +as well as a graceful shutdown capability for `SimpleAsyncTaskExecutor` as well. + [[scheduling-task-executor-usage]] === Using a `TaskExecutor` -Spring's `TaskExecutor` implementations are used as simple JavaBeans. In the following example, -we define a bean that uses the `ThreadPoolTaskExecutor` to asynchronously print -out a set of messages: +Spring's `TaskExecutor` implementations are commonly used with dependency injection. +In the following example, we define a bean that uses the `ThreadPoolTaskExecutor` +to asynchronously print out a set of messages: [source,java,indent=0,subs="verbatim,quotes"] ---- @@ -227,8 +232,8 @@ fixed delay, those methods should be used directly whenever possible. The value `PeriodicTrigger` implementation is that you can use it within components that rely on the `Trigger` abstraction. For example, it may be convenient to allow periodic triggers, cron-based triggers, and even custom trigger implementations to be used interchangeably. -Such a component could take advantage of dependency injection so that you can configure such `Triggers` -externally and, therefore, easily modify or extend them. +Such a component could take advantage of dependency injection so that you can configure +such `Triggers` externally and, therefore, easily modify or extend them. [[scheduling-task-scheduler-implementations]] @@ -238,10 +243,8 @@ As with Spring's `TaskExecutor` abstraction, the primary benefit of the `TaskSch arrangement is that an application's scheduling needs are decoupled from the deployment environment. This abstraction level is particularly relevant when deploying to an application server environment where threads should not be created directly by the -application itself. For such scenarios, Spring provides a `TimerManagerTaskScheduler` -that delegates to a CommonJ `TimerManager` on WebLogic or WebSphere as well as a more recent -`DefaultManagedTaskScheduler` that delegates to a JSR-236 `ManagedScheduledExecutorService` -in a Jakarta EE environment. Both are typically configured with a JNDI lookup. +application itself. For such scenarios, Spring provides a `DefaultManagedTaskScheduler` +that delegates to a JSR-236 `ManagedScheduledExecutorService` in a Jakarta EE environment. Whenever external thread management is not a requirement, a simpler alternative is a local `ScheduledExecutorService` setup within the application, which can be adapted @@ -251,6 +254,11 @@ to provide common bean-style configuration along the lines of `ThreadPoolTaskExe These variants work perfectly fine for locally embedded thread pool setups in lenient application server environments, as well -- in particular on Tomcat and Jetty. +As of 6.1, `ThreadPoolTaskScheduler` provides a pause/resume capability and graceful +shutdown through Spring's lifecycle management. There is also a new option called +`SimpleAsyncTaskScheduler` which is aligned with JDK 21's Virtual Threads, using a +single scheduler thread but firing up a new thread for every scheduled task execution. + [[scheduling-annotation-support]] diff --git a/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java b/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java index 8a8b7f5310..8fb44864f6 100644 --- a/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java +++ b/spring-context/src/main/java/org/springframework/scheduling/concurrent/SimpleAsyncTaskScheduler.java @@ -44,7 +44,19 @@ import org.springframework.util.ErrorHandler; * A simple implementation of Spring's {@link TaskScheduler} interface, using * a single scheduler thread and executing every scheduled task in an individual * separate thread. This is an attractive choice with virtual threads on JDK 21, - * so it is commonly used with {@link #setVirtualThreads setVirtualThreads(true)}. + * expecting common usage with {@link #setVirtualThreads setVirtualThreads(true)}. + * + *

Supports a graceful shutdown through {@link #setTaskTerminationTimeout}, + * at the expense of task tracking overhead per execution thread at runtime. + * Supports limiting concurrent threads through {@link #setConcurrencyLimit}. + * By default, the number of concurrent task executions is unlimited. + * This allows for dynamic concurrency of scheduled task executions, in contrast + * to {@link ThreadPoolTaskScheduler} which requires a fixed pool size. + * + *

NOTE: This implementation does not reuse threads! Consider a + * thread-pooling TaskScheduler implementation instead, in particular for + * scheduling a large number of short-lived tasks. Alternatively, on JDK 21, + * consider setting {@link #setVirtualThreads} to {@code true}. * *

Extends {@link SimpleAsyncTaskExecutor} and can serve as a fully capable * replacement for it, e.g. as a single shared instance serving as a @@ -64,13 +76,14 @@ import org.springframework.util.ErrorHandler; * @author Juergen Hoeller * @since 6.1 * @see #setVirtualThreads - * @see #setTargetTaskExecutor + * @see #setTaskTerminationTimeout + * @see #setConcurrencyLimit * @see SimpleAsyncTaskExecutor * @see ThreadPoolTaskScheduler */ @SuppressWarnings("serial") public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements TaskScheduler, - ApplicationContextAware, SmartLifecycle, ApplicationListener, AutoCloseable { + ApplicationContextAware, SmartLifecycle, ApplicationListener { private static final TimeUnit NANO = TimeUnit.NANOSECONDS; @@ -275,6 +288,7 @@ public class SimpleAsyncTaskScheduler extends SimpleAsyncTaskExecutor implements future.cancel(true); } } + super.close(); } } diff --git a/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java b/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java index 73e7e8ec87..aa3a1d5f06 100644 --- a/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java +++ b/spring-context/src/test/java/org/springframework/scheduling/annotation/EnableSchedulingTests.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -33,6 +34,7 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; +import org.springframework.core.task.TaskExecutor; import org.springframework.core.testfixture.EnabledForTestGroups; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler; @@ -66,6 +68,10 @@ public class EnableSchedulingTests { } + /* + * Tests compatibility between default executor in TaskSchedulerRouter + * and explicit ThreadPoolTaskScheduler in configuration subclass. + */ @ParameterizedTest @ValueSource(classes = {FixedRateTaskConfig.class, FixedRateTaskConfigSubclass.class}) @EnabledForTestGroups(LONG_RUNNING) @@ -77,8 +83,14 @@ public class EnableSchedulingTests { assertThat(ctx.getBean(AtomicInteger.class).get()).isGreaterThanOrEqualTo(10); } + /* + * Tests compatibility between SimpleAsyncTaskScheduler in regular configuration + * and explicit ThreadPoolTaskScheduler in configuration subclass. This includes + * pause/resume behavior and a controlled shutdown with a 1s termination timeout. + */ @ParameterizedTest @ValueSource(classes = {ExplicitSchedulerConfig.class, ExplicitSchedulerConfigSubclass.class}) + @Timeout(2) // should actually complete within 1s @EnabledForTestGroups(LONG_RUNNING) public void withExplicitScheduler(Class configClass) throws InterruptedException { ctx = new AnnotationConfigApplicationContext(configClass); @@ -96,9 +108,35 @@ public class EnableSchedulingTests { int count3 = ctx.getBean(AtomicInteger.class).get(); assertThat(count3).isGreaterThanOrEqualTo(20); + TaskExecutor executor = ctx.getBean(TaskExecutor.class); + AtomicInteger count = new AtomicInteger(0); + for (int i = 0; i < 2; i++) { + executor.execute(() -> { + try { + Thread.sleep(10000); // try to break test timeout + } + catch (InterruptedException ex) { + // expected during executor shutdown + try { + Thread.sleep(500); + // should get here within task termination timeout (1000) + count.incrementAndGet(); + } + catch (InterruptedException ex2) { + // not expected + } + } + }); + } + assertThat(ctx.getBean(ExplicitSchedulerConfig.class).threadName).startsWith("explicitScheduler-"); - assertThat(Arrays.asList(ctx.getDefaultListableBeanFactory().getDependentBeans("myTaskScheduler")).contains( - TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)).isTrue(); + assertThat(Arrays.asList(ctx.getDefaultListableBeanFactory().getDependentBeans("myTaskScheduler")) + .contains(TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)).isTrue(); + + // Include executor shutdown in test timeout (2 seconds), + // expecting interruption of the sleeping thread... + ctx.close(); + assertThat(count.intValue()).isEqualTo(2); } @Test @@ -226,6 +264,11 @@ public class EnableSchedulingTests { @Configuration static class FixedRateTaskConfigSubclass extends FixedRateTaskConfig { + + @Bean + public TaskScheduler taskScheduler() { + return new ThreadPoolTaskScheduler(); + } } @@ -239,6 +282,7 @@ public class EnableSchedulingTests { public TaskScheduler myTaskScheduler() { SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler-"); + scheduler.setTaskTerminationTimeout(1000); return scheduler; } @@ -263,6 +307,8 @@ public class EnableSchedulingTests { public TaskScheduler myTaskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler-"); + scheduler.setAwaitTerminationMillis(1000); + scheduler.setPoolSize(2); return scheduler; } } @@ -437,6 +483,7 @@ public class EnableSchedulingTests { public TaskScheduler taskScheduler1() { SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler1"); + scheduler.setConcurrencyLimit(1); return scheduler; } @@ -478,6 +525,7 @@ public class EnableSchedulingTests { public TaskScheduler taskScheduler1() { SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler1-"); + scheduler.setConcurrencyLimit(1); return scheduler; } @@ -508,6 +556,7 @@ public class EnableSchedulingTests { public TaskScheduler taskScheduler1() { SimpleAsyncTaskScheduler scheduler = new SimpleAsyncTaskScheduler(); scheduler.setThreadNamePrefix("explicitScheduler1-"); + scheduler.setConcurrencyLimit(1); return scheduler; } diff --git a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java index c4fc4374f9..ce7eee8c12 100644 --- a/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java +++ b/spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java @@ -17,7 +17,10 @@ package org.springframework.core.task; import java.io.Serializable; +import java.util.Collections; +import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.ThreadFactory; @@ -31,10 +34,12 @@ import org.springframework.util.concurrent.ListenableFutureTask; /** * {@link TaskExecutor} implementation that fires up a new Thread for each task, - * executing it asynchronously. Supports a virtual thread option on JDK 21. + * executing it asynchronously. Provides a virtual thread option on JDK 21. * - *

Supports limiting concurrent threads through the "concurrencyLimit" - * bean property. By default, the number of concurrent threads is unlimited. + *

Supports a graceful shutdown through {@link #setTaskTerminationTimeout}, + * at the expense of task tracking overhead per execution thread at runtime. + * Supports limiting concurrent threads through {@link #setConcurrencyLimit}. + * By default, the number of concurrent task executions is unlimited. * *

NOTE: This implementation does not reuse threads! Consider a * thread-pooling TaskExecutor implementation instead, in particular for @@ -44,13 +49,14 @@ import org.springframework.util.concurrent.ListenableFutureTask; * @author Juergen Hoeller * @since 2.0 * @see #setVirtualThreads + * @see #setTaskTerminationTimeout * @see #setConcurrencyLimit - * @see SyncTaskExecutor + * @see org.springframework.scheduling.concurrent.SimpleAsyncTaskScheduler * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor */ @SuppressWarnings({"serial", "deprecation"}) public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator - implements AsyncListenableTaskExecutor, Serializable { + implements AsyncListenableTaskExecutor, Serializable, AutoCloseable { /** * Permit any number of concurrent invocations: that is, don't throttle concurrency. @@ -77,6 +83,13 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @Nullable private TaskDecorator taskDecorator; + private long taskTerminationTimeout; + + @Nullable + private Set activeThreads; + + private volatile boolean active = true; + /** * Create a new SimpleAsyncTaskExecutor with default thread name prefix. @@ -147,33 +160,62 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator * have to cast it and call {@code Future#get} to evaluate exceptions. * @since 4.3 */ - public final void setTaskDecorator(TaskDecorator taskDecorator) { + public void setTaskDecorator(TaskDecorator taskDecorator) { this.taskDecorator = taskDecorator; } /** - * Set the maximum number of parallel accesses allowed. - * -1 indicates no concurrency limit at all. - *

In principle, this limit can be changed at runtime, - * although it is generally designed as a config time setting. - * NOTE: Do not switch between -1 and any concrete limit at runtime, - * as this will lead to inconsistent concurrency counts: A limit - * of -1 effectively turns off concurrency counting completely. + * Specify a timeout for task termination when closing this executor. + * The default is 0, not waiting for task termination at all. + *

Note that a concrete >0 timeout specified here will lead to the + * wrapping of every submitted task into a task-tracking runnable which + * involves considerable overhead in case of a high number of tasks. + * However, for a modest level of submissions with longer-running + * tasks, this is feasible in order to arrive at a graceful shutdown. + * @param timeout the timeout in milliseconds + * @since 6.1 + * @see #close() + * @see org.springframework.scheduling.concurrent.ExecutorConfigurationSupport#setAwaitTerminationMillis + */ + public void setTaskTerminationTimeout(long timeout) { + Assert.isTrue(timeout >= 0, "Timeout value must be >=0"); + this.taskTerminationTimeout = timeout; + this.activeThreads = (timeout > 0 ? Collections.newSetFromMap(new ConcurrentHashMap<>()) : null); + } + + /** + * Return whether this executor is still active, i.e. not closed yet, + * and therefore accepts further task submissions. Otherwise, it is + * either in the task termination phase or entirely shut down already. + * @since 6.1 + * @see #setTaskTerminationTimeout + * @see #close() + */ + public boolean isActive() { + return this.active; + } + + /** + * Set the maximum number of parallel task executions allowed. + * The default of -1 indicates no concurrency limit at all. + *

This is the equivalent of a maximum pool size in a thread pool, + * preventing temporary overload of the thread management system. * @see #UNBOUNDED_CONCURRENCY + * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor#setMaxPoolSize */ public void setConcurrencyLimit(int concurrencyLimit) { this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit); } /** - * Return the maximum number of parallel accesses allowed. + * Return the maximum number of parallel task executions allowed. */ public final int getConcurrencyLimit() { return this.concurrencyThrottle.getConcurrencyLimit(); } /** - * Return whether this throttle is currently active. + * Return whether the concurrency throttle is currently active. * @return {@code true} if the concurrency limit for this instance is active * @see #getConcurrencyLimit() * @see #setConcurrencyLimit @@ -207,10 +249,17 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator @Override public void execute(Runnable task, long startTimeout) { Assert.notNull(task, "Runnable must not be null"); + if (!isActive()) { + throw new TaskRejectedException(getClass().getSimpleName() + " has been closed already"); + } + Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task); if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) { this.concurrencyThrottle.beforeAccess(); - doExecute(new ConcurrencyThrottlingRunnable(taskToUse)); + doExecute(new TaskTrackingRunnable(taskToUse)); + } + else if (this.activeThreads != null) { + doExecute(new TaskTrackingRunnable(taskToUse)); } else { doExecute(taskToUse); @@ -278,6 +327,33 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator } } + /** + * This close methods tracks the termination of active threads if a concrete + * {@link #setTaskTerminationTimeout task termination timeout} has been set. + * Otherwise, it is not necessary to close this executor. + * @since 6.1 + */ + @Override + public void close() { + if (this.active) { + this.active = false; + Set threads = this.activeThreads; + if (threads != null) { + threads.forEach(Thread::interrupt); + synchronized (threads) { + try { + if (!threads.isEmpty()) { + threads.wait(this.taskTerminationTimeout); + } + } + catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + } + } + } + /** * Subclass of the general ConcurrencyThrottleSupport class, @@ -299,23 +375,40 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator /** - * This Runnable calls {@code afterAccess()} after the - * target Runnable has finished its execution. + * Decorates a target task with active thread tracking + * and concurrency throttle management, if necessary. */ - private class ConcurrencyThrottlingRunnable implements Runnable { + private class TaskTrackingRunnable implements Runnable { - private final Runnable target; + private final Runnable task; - public ConcurrencyThrottlingRunnable(Runnable target) { - this.target = target; + public TaskTrackingRunnable(Runnable task) { + Assert.notNull(task, "Task must not be null"); + this.task = task; } @Override public void run() { + Set threads = activeThreads; + Thread thread = null; + if (threads != null) { + thread = Thread.currentThread(); + threads.add(thread); + } try { - this.target.run(); + this.task.run(); } finally { + if (threads != null) { + threads.remove(thread); + if (!isActive()) { + synchronized (threads) { + if (threads.isEmpty()) { + threads.notify(); + } + } + } + } concurrencyThrottle.afterAccess(); } }