Browse Source

Prevent unbounded retention of cancelled SockJS tasks

This change sets the removeOnCancelPolicy on the SockJS
ScheduledThreadPoolExecutor to true. This ensures that cancelled
tasks are removed immediately to avoid the "unbounded retention
of cancelled tasks" that is mentioned in the Javadoc of
ScheduledThreadPoolExecutor:

"By default, such a cancelled task is not automatically removed from
the work queue until its delay elapses. While this enables further
inspection and monitoring, it may also cause unbounded retention of
cancelled tasks. To avoid this, set setRemoveOnCancelPolicy to true,
which causes tasks to be immediately removed from the work queue at
time of cancellation."

Issue: SPR-11918
pull/568/head
Rossen Stoyanchev 11 years ago
parent
commit
5d2e6f6d4c
  1. 19
      spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketNamespaceUtils.java
  2. 15
      spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java
  3. 14
      spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java
  4. 1
      spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java
  5. 6
      spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java

19
spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketNamespaceUtils.java

@ -31,6 +31,11 @@ import org.springframework.web.socket.sockjs.transport.TransportHandlingSockJsSe @@ -31,6 +31,11 @@ import org.springframework.web.socket.sockjs.transport.TransportHandlingSockJsSe
import org.springframework.web.socket.sockjs.transport.handler.DefaultSockJsService;
import org.springframework.web.socket.sockjs.transport.handler.WebSocketTransportHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
/**
* Provides utility methods for parsing common WebSocket XML namespace elements.
*
@ -135,7 +140,7 @@ class WebSocketNamespaceUtils { @@ -135,7 +140,7 @@ class WebSocketNamespaceUtils {
ParserContext parserContext, Object source) {
if (!parserContext.getRegistry().containsBeanDefinition(schedulerName)) {
RootBeanDefinition taskSchedulerDef = new RootBeanDefinition(ThreadPoolTaskScheduler.class);
RootBeanDefinition taskSchedulerDef = new RootBeanDefinition(SockJsThreadPoolTaskScheduler.class);
taskSchedulerDef.setSource(source);
taskSchedulerDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
taskSchedulerDef.getPropertyValues().add("poolSize", Runtime.getRuntime().availableProcessors());
@ -161,4 +166,16 @@ class WebSocketNamespaceUtils { @@ -161,4 +166,16 @@ class WebSocketNamespaceUtils {
return beans;
}
@SuppressWarnings("serial")
private static class SockJsThreadPoolTaskScheduler extends ThreadPoolTaskScheduler {
@Override
protected ExecutorService initializeExecutor(ThreadFactory factory, RejectedExecutionHandler handler) {
ExecutorService service = super.initializeExecutor(factory, handler);
((ScheduledThreadPoolExecutor) service).setRemoveOnCancelPolicy(true);
return service;
}
}
}

15
spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java

@ -21,6 +21,11 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @@ -21,6 +21,11 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.servlet.handler.AbstractHandlerMapping;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
/**
* Configuration support for WebSocket request handling.
*
@ -59,7 +64,15 @@ public class WebSocketConfigurationSupport { @@ -59,7 +64,15 @@ public class WebSocketConfigurationSupport {
*/
@Bean
public ThreadPoolTaskScheduler defaultSockJsTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
@SuppressWarnings("serial")
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler() {
@Override
protected ExecutorService initializeExecutor(ThreadFactory factory, RejectedExecutionHandler handler) {
ExecutorService service = super.initializeExecutor(factory, handler);
((ScheduledThreadPoolExecutor) service).setRemoveOnCancelPolicy(true);
return service;
}
};
scheduler.setThreadNamePrefix("SockJS-");
return scheduler;
}

14
spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java

@ -17,6 +17,10 @@ @@ -17,6 +17,10 @@
package org.springframework.web.socket.config.annotation;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import org.springframework.beans.factory.config.CustomScopeConfigurer;
import org.springframework.context.annotation.Bean;
@ -96,7 +100,15 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac @@ -96,7 +100,15 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac
*/
@Bean
public ThreadPoolTaskScheduler messageBrokerSockJsTaskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
@SuppressWarnings("serial")
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler() {
@Override
protected ExecutorService initializeExecutor(ThreadFactory factory, RejectedExecutionHandler handler) {
ExecutorService service = super.initializeExecutor(factory, handler);
((ScheduledThreadPoolExecutor) service).setRemoveOnCancelPolicy(true);
return service;
}
};
scheduler.setPoolSize(Runtime.getRuntime().availableProcessors());
scheduler.setThreadNamePrefix("MessageBrokerSockJS-");
return scheduler;

1
spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java

@ -140,6 +140,7 @@ public class MessageBrokerBeanDefinitionParserTests { @@ -140,6 +140,7 @@ public class MessageBrokerBeanDefinitionParserTests {
ThreadPoolTaskScheduler scheduler = (ThreadPoolTaskScheduler) defaultSockJsService.getTaskScheduler();
assertEquals(Runtime.getRuntime().availableProcessors(), scheduler.getScheduledThreadPoolExecutor().getCorePoolSize());
assertTrue(scheduler.getScheduledThreadPoolExecutor().getRemoveOnCancelPolicy());
UserSessionRegistry userSessionRegistry = this.appContext.getBean(UserSessionRegistry.class);
assertNotNull(userSessionRegistry);

6
spring-websocket/src/test/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupportTests.java

@ -20,6 +20,7 @@ import java.util.ArrayList; @@ -20,6 +20,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.Before;
import org.junit.Test;
@ -128,8 +129,9 @@ public class WebSocketMessageBrokerConfigurationSupportTests { @@ -128,8 +129,9 @@ public class WebSocketMessageBrokerConfigurationSupportTests {
ThreadPoolTaskScheduler taskScheduler =
this.config.getBean("messageBrokerSockJsTaskScheduler", ThreadPoolTaskScheduler.class);
assertEquals(Runtime.getRuntime().availableProcessors(),
taskScheduler.getScheduledThreadPoolExecutor().getCorePoolSize());
ScheduledThreadPoolExecutor executor = taskScheduler.getScheduledThreadPoolExecutor();
assertEquals(Runtime.getRuntime().availableProcessors(), executor.getCorePoolSize());
assertTrue(executor.getRemoveOnCancelPolicy());
}

Loading…
Cancel
Save