diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketNamespaceUtils.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketNamespaceUtils.java index 0640c3de9c..d1372cdca9 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketNamespaceUtils.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/WebSocketNamespaceUtils.java @@ -16,6 +16,7 @@ package org.springframework.web.socket.config; +import org.springframework.web.socket.sockjs.transport.SockJsThreadPoolTaskScheduler; import org.w3c.dom.Element; import org.springframework.beans.factory.config.BeanDefinition; @@ -24,18 +25,12 @@ import org.springframework.beans.factory.parsing.BeanComponentDefinition; import org.springframework.beans.factory.support.ManagedList; import org.springframework.beans.factory.support.RootBeanDefinition; import org.springframework.beans.factory.xml.ParserContext; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.util.xml.DomUtils; import org.springframework.web.socket.server.support.DefaultHandshakeHandler; import org.springframework.web.socket.sockjs.transport.TransportHandlingSockJsService; import org.springframework.web.socket.sockjs.transport.handler.DefaultSockJsService; import org.springframework.web.socket.sockjs.transport.handler.WebSocketTransportHandler; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; - /** * Provides utility methods for parsing common WebSocket XML namespace elements. * @@ -166,16 +161,4 @@ class WebSocketNamespaceUtils { return beans; } - - @SuppressWarnings("serial") - private static class SockJsThreadPoolTaskScheduler extends ThreadPoolTaskScheduler { - - @Override - protected ExecutorService initializeExecutor(ThreadFactory factory, RejectedExecutionHandler handler) { - ExecutorService service = super.initializeExecutor(factory, handler); - ((ScheduledThreadPoolExecutor) service).setRemoveOnCancelPolicy(true); - return service; - } - } - } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java index f8769842b8..bd343628df 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java @@ -20,11 +20,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.servlet.HandlerMapping; import org.springframework.web.servlet.handler.AbstractHandlerMapping; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; +import org.springframework.web.socket.sockjs.transport.SockJsThreadPoolTaskScheduler; /** * Configuration support for WebSocket request handling. @@ -64,15 +60,7 @@ public class WebSocketConfigurationSupport { */ @Bean public ThreadPoolTaskScheduler defaultSockJsTaskScheduler() { - @SuppressWarnings("serial") - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler() { - @Override - protected ExecutorService initializeExecutor(ThreadFactory factory, RejectedExecutionHandler handler) { - ExecutorService service = super.initializeExecutor(factory, handler); - ((ScheduledThreadPoolExecutor) service).setRemoveOnCancelPolicy(true); - return service; - } - }; + ThreadPoolTaskScheduler scheduler = new SockJsThreadPoolTaskScheduler(); scheduler.setThreadNamePrefix("SockJS-"); return scheduler; } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java index a901da1082..1d170b81f9 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java @@ -17,20 +17,16 @@ package org.springframework.web.socket.config.annotation; import java.util.Collections; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; import org.springframework.beans.factory.config.CustomScopeConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.messaging.simp.SimpSessionScope; import org.springframework.messaging.simp.config.AbstractMessageBrokerConfiguration; -import org.springframework.messaging.simp.user.UserSessionRegistry; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.servlet.HandlerMapping; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.messaging.SubProtocolWebSocketHandler; +import org.springframework.web.socket.sockjs.transport.SockJsThreadPoolTaskScheduler; /** * Extends {@link AbstractMessageBrokerConfiguration} and adds configuration for @@ -100,16 +96,7 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac */ @Bean public ThreadPoolTaskScheduler messageBrokerSockJsTaskScheduler() { - @SuppressWarnings("serial") - ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler() { - @Override - protected ExecutorService initializeExecutor(ThreadFactory factory, RejectedExecutionHandler handler) { - ExecutorService service = super.initializeExecutor(factory, handler); - ((ScheduledThreadPoolExecutor) service).setRemoveOnCancelPolicy(true); - return service; - } - }; - scheduler.setPoolSize(Runtime.getRuntime().availableProcessors()); + ThreadPoolTaskScheduler scheduler = new SockJsThreadPoolTaskScheduler(); scheduler.setThreadNamePrefix("MessageBrokerSockJS-"); return scheduler; } diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/SockJsThreadPoolTaskScheduler.java b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/SockJsThreadPoolTaskScheduler.java new file mode 100644 index 0000000000..dd0d2a5731 --- /dev/null +++ b/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/transport/SockJsThreadPoolTaskScheduler.java @@ -0,0 +1,66 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.socket.sockjs.transport; + +import org.springframework.lang.UsesJava7; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.util.ClassUtils; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; + +/** + * An extension of ThreadPoolTaskScheduler optimized for managing a large number + * of task, e.g. setting he pool size to the number of available processors and + * setting the setRemoveOnCancelPolicy property of + * {@link java.util.concurrent.ScheduledThreadPoolExecutor} available in JDK 1.7 + * or higher in order to avoid keeping cancelled tasks around. + * + * @author Rossen Stoyanchev + * @since 4.1 + */ +@SuppressWarnings("serial") +public class SockJsThreadPoolTaskScheduler extends ThreadPoolTaskScheduler { + + // Check for setRemoveOnCancelPolicy method - available on JDK 7 and higher + private static boolean hasRemoveOnCancelPolicyMethod = ClassUtils.hasMethod( + ScheduledThreadPoolExecutor.class, "setRemoveOnCancelPolicy", Boolean.class); + + + public SockJsThreadPoolTaskScheduler() { + setThreadNamePrefix("SockJS-"); + setPoolSize(Runtime.getRuntime().availableProcessors()); + } + + + @Override + protected ExecutorService initializeExecutor(ThreadFactory factory, RejectedExecutionHandler handler) { + ExecutorService service = super.initializeExecutor(factory, handler); + configureRemoveOnCancelPolicy((ScheduledThreadPoolExecutor) service); + return service; + } + + @UsesJava7 // guard setting removeOnCancelPolicy (safe with 1.6 due to hasRemoveOnCancelPolicyMethod check) + private void configureRemoveOnCancelPolicy(ScheduledThreadPoolExecutor service) { + if (hasRemoveOnCancelPolicyMethod) { + service.setRemoveOnCancelPolicy(true); + } + } + +}