Browse Source

Update thread pool settings in STOMP/WebSocket config

The clientInboundChannel and clientOutboundChannel now use twice
the number of available processors by default to accomodate for some
degree of blocking in task execution on average.

In practice these settings still need to be configured explicitly in
applications but these should serve as better default values than
the default values in ThreadPoolTaskExecutor.

Issue: SPR-11450
pull/492/head
Rossen Stoyanchev 11 years ago
parent
commit
10af128e96
  1. 55
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java
  2. 24
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java
  3. 66
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java
  4. 79
      spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java
  5. 3
      spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java
  6. 29
      spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd
  7. 8
      spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java

55
spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java

@ -17,7 +17,13 @@ @@ -17,7 +17,13 @@
package org.springframework.messaging.simp.config;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
@ -93,17 +99,15 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @@ -93,17 +99,15 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
@Bean
public AbstractSubscribableChannel clientInboundChannel() {
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor());
ChannelRegistration r = getClientInboundChannelRegistration();
if (r.hasInterceptors()) {
channel.setInterceptors(r.getInterceptors());
}
ChannelRegistration reg = getClientInboundChannelRegistration();
channel.setInterceptors(reg.getInterceptors());
return channel;
}
@Bean
public ThreadPoolTaskExecutor clientInboundChannelExecutor() {
TaskExecutorRegistration r = getClientInboundChannelRegistration().getTaskExecutorRegistration();
ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor();
TaskExecutorRegistration reg = getClientInboundChannelRegistration().getOrCreateTaskExecRegistration();
ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
executor.setThreadNamePrefix("clientInboundChannel-");
return executor;
}
@ -129,17 +133,15 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @@ -129,17 +133,15 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
@Bean
public AbstractSubscribableChannel clientOutboundChannel() {
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor());
ChannelRegistration r = getClientOutboundChannelRegistration();
if (r.hasInterceptors()) {
channel.setInterceptors(r.getInterceptors());
}
ChannelRegistration reg = getClientOutboundChannelRegistration();
channel.setInterceptors(reg.getInterceptors());
return channel;
}
@Bean
public ThreadPoolTaskExecutor clientOutboundChannelExecutor() {
TaskExecutorRegistration r = getClientOutboundChannelRegistration().getTaskExecutorRegistration();
ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor();
TaskExecutorRegistration reg = getClientOutboundChannelRegistration().getOrCreateTaskExecRegistration();
ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
executor.setThreadNamePrefix("clientOutboundChannel-");
return executor;
}
@ -162,24 +164,27 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @@ -162,24 +164,27 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
@Bean
public AbstractSubscribableChannel brokerChannel() {
ChannelRegistration r = getBrokerRegistry().getBrokerChannelRegistration();
ExecutorSubscribableChannel channel;
if (r.hasTaskExecutor()) {
channel = new ExecutorSubscribableChannel(); // synchronous by default
}
else {
channel = new ExecutorSubscribableChannel(brokerChannelExecutor());
}
if (r.hasInterceptors()) {
channel.setInterceptors(r.getInterceptors());
}
ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration();
ExecutorSubscribableChannel channel = reg.hasTaskExecutor() ?
new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel();
channel.setInterceptors(reg.getInterceptors());
return channel;
}
@Bean
public ThreadPoolTaskExecutor brokerChannelExecutor() {
TaskExecutorRegistration r = getBrokerRegistry().getBrokerChannelRegistration().getTaskExecutorRegistration();
ThreadPoolTaskExecutor executor = (r != null) ? r.getTaskExecutor() : new ThreadPoolTaskExecutor();
ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration();
ThreadPoolTaskExecutor executor;
if (reg.hasTaskExecutor()) {
executor = reg.taskExecutor().getTaskExecutor();
}
else {
// Should never be used
executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(0);
executor.setMaxPoolSize(1);
executor.setQueueCapacity(0);
}
executor.setThreadNamePrefix("brokerChannel-");
return executor;
}

24
spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -31,17 +31,19 @@ import org.springframework.messaging.support.ChannelInterceptor; @@ -31,17 +31,19 @@ import org.springframework.messaging.support.ChannelInterceptor;
*/
public class ChannelRegistration {
private TaskExecutorRegistration taskExecutorRegistration;
private TaskExecutorRegistration registration;
private List<ChannelInterceptor> interceptors = new ArrayList<ChannelInterceptor>();
private final List<ChannelInterceptor> interceptors = new ArrayList<ChannelInterceptor>();
/**
* Configure properties of the ThreadPoolTaskExecutor backing the message channel.
* Configure the thread pool backing this message channel.
*/
public TaskExecutorRegistration taskExecutor() {
this.taskExecutorRegistration = new TaskExecutorRegistration();
return this.taskExecutorRegistration;
if (this.registration == null) {
this.registration = new TaskExecutorRegistration();
}
return this.registration;
}
/**
@ -56,11 +58,15 @@ public class ChannelRegistration { @@ -56,11 +58,15 @@ public class ChannelRegistration {
protected boolean hasTaskExecutor() {
return (this.taskExecutorRegistration != null);
return (this.registration != null);
}
protected TaskExecutorRegistration getTaskExecRegistration() {
return this.registration;
}
protected TaskExecutorRegistration getTaskExecutorRegistration() {
return this.taskExecutorRegistration;
protected TaskExecutorRegistration getOrCreateTaskExecRegistration() {
return taskExecutor();
}
protected boolean hasInterceptors() {

66
spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -18,6 +18,7 @@ package org.springframework.messaging.simp.config; @@ -18,6 +18,7 @@ package org.springframework.messaging.simp.config;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* A registration class for customizing the properties of {@link ThreadPoolTaskExecutor}.
*
@ -26,18 +27,28 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -26,18 +27,28 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
*/
public class TaskExecutorRegistration {
private int corePoolSize = 1;
private int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
private int maxPoolSize = Integer.MAX_VALUE;
private int keepAliveSeconds = 60;
private int queueCapacity = Integer.MAX_VALUE;
private int keepAliveSeconds = 60;
/**
* Set the ThreadPoolExecutor's core pool size.
* Default is 1.
* Set the core pool size of the ThreadPoolExecutor.
*
* <p><strong>NOTE:</strong> the core pool size is effectively the max pool size
* when an unbounded {@link #queueCapacity(int) queueCapacity} is configured
* (the default). This is essentially the "Unbounded queues" strategy as explained
* in {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor}. When
* this strategy is used, the {@link #maxPoolSize(int) maxPoolSize} is ignored.
*
* <p>By default this is set to twice the value of
* {@link Runtime#availableProcessors()}. In an an application where tasks do not
* block frequently, the number should be closer to or equal to the number of
* available CPUs/cores.
*/
public TaskExecutorRegistration corePoolSize(int corePoolSize) {
this.corePoolSize = corePoolSize;
@ -45,8 +56,15 @@ public class TaskExecutorRegistration { @@ -45,8 +56,15 @@ public class TaskExecutorRegistration {
}
/**
* Set the ThreadPoolExecutor's maximum pool size.
* Default is {@code Integer.MAX_VALUE}.
* Set the max pool size of the ThreadPoolExecutor.
*
* <p><strong>NOTE:</strong> when an unbounded
* {@link #queueCapacity(int) queueCapacity} is configured (the default), the
* max pool size is effectively ignored. See the "Unbounded queues" strategy
* in {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor} for
* more details.
*
* <p>By default this is set to {@code Integer.MAX_VALUE}.
*/
public TaskExecutorRegistration maxPoolSize(int maxPoolSize) {
this.maxPoolSize = maxPoolSize;
@ -54,24 +72,32 @@ public class TaskExecutorRegistration { @@ -54,24 +72,32 @@ public class TaskExecutorRegistration {
}
/**
* Set the ThreadPoolExecutor's keep-alive seconds.
* Default is 60.
* Set the queue capacity for the ThreadPoolExecutor.
*
* <p><strong>NOTE:</strong> when an unbounded
* {@link #queueCapacity(int) queueCapacity} is configured (the default) the
* core pool size is effectively the max pool size. This is essentially the
* "Unbounded queues" strategy as explained in
* {@link java.util.concurrent.ThreadPoolExecutor ThreadPoolExecutor}. When
* this strategy is used, the {@link #maxPoolSize(int) maxPoolSize} is ignored.
*
* <p>By default this is set to {@code Integer.MAX_VALUE}.
*/
public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
public TaskExecutorRegistration queueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
return this;
}
/**
* Set the capacity for the ThreadPoolExecutor's BlockingQueue.
* Default is {@code Integer.MAX_VALUE}.
* <p>Any positive value will lead to a LinkedBlockingQueue instance;
* any other value will lead to a SynchronousQueue instance.
* @see java.util.concurrent.LinkedBlockingQueue
* @see java.util.concurrent.SynchronousQueue
* Set the time limit for which threads may remain idle before being terminated.
* If there are more than the core number of threads currently in the pool,
* after waiting this amount of time without processing a task, excess threads
* will be terminated. This overrides any value set in the constructor.
*
* <p>By default this is set to 60.
*/
public TaskExecutorRegistration queueCapacity(int queueCapacity) {
this.queueCapacity = queueCapacity;
public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
return this;
}

79
spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java

@ -31,6 +31,7 @@ import org.springframework.context.annotation.Bean; @@ -31,6 +31,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.StaticApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.converter.*;
import org.springframework.messaging.handler.annotation.MessageMapping;
@ -66,24 +67,30 @@ import static org.junit.Assert.*; @@ -66,24 +67,30 @@ import static org.junit.Assert.*;
*/
public class MessageBrokerConfigurationTests {
private AnnotationConfigApplicationContext simpleContext;
private AnnotationConfigApplicationContext simpleBrokerContext;
private AnnotationConfigApplicationContext brokerRelayContext;
private AnnotationConfigApplicationContext defaultContext;
private AnnotationConfigApplicationContext customChannelContext;
@Before
public void setupOnce() {
this.simpleContext = new AnnotationConfigApplicationContext();
this.simpleContext.register(SimpleConfig.class);
this.simpleContext.refresh();
this.simpleBrokerContext = new AnnotationConfigApplicationContext();
this.simpleBrokerContext.register(SimpleBrokerConfig.class);
this.simpleBrokerContext.refresh();
this.brokerRelayContext = new AnnotationConfigApplicationContext();
this.brokerRelayContext.register(BrokerRelayConfig.class);
this.brokerRelayContext.refresh();
this.defaultContext = new AnnotationConfigApplicationContext();
this.defaultContext.register(DefaultConfig.class);
this.defaultContext.refresh();
this.customChannelContext = new AnnotationConfigApplicationContext();
this.customChannelContext.register(CustomChannelConfig.class);
this.customChannelContext.refresh();
@ -93,13 +100,13 @@ public class MessageBrokerConfigurationTests { @@ -93,13 +100,13 @@ public class MessageBrokerConfigurationTests {
@Test
public void clientInboundChannel() {
TestChannel channel = this.simpleContext.getBean("clientInboundChannel", TestChannel.class);
TestChannel channel = this.simpleBrokerContext.getBean("clientInboundChannel", TestChannel.class);
Set<MessageHandler> handlers = channel.getSubscribers();
assertEquals(3, handlers.size());
assertTrue(handlers.contains(simpleContext.getBean(SimpAnnotationMethodMessageHandler.class)));
assertTrue(handlers.contains(simpleContext.getBean(UserDestinationMessageHandler.class)));
assertTrue(handlers.contains(simpleContext.getBean(SimpleBrokerMessageHandler.class)));
assertTrue(handlers.contains(simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class)));
assertTrue(handlers.contains(simpleBrokerContext.getBean(UserDestinationMessageHandler.class)));
assertTrue(handlers.contains(simpleBrokerContext.getBean(SimpleBrokerMessageHandler.class)));
}
@Test
@ -130,8 +137,8 @@ public class MessageBrokerConfigurationTests { @@ -130,8 +137,8 @@ public class MessageBrokerConfigurationTests {
@Test
public void clientOutboundChannelUsedByAnnotatedMethod() {
TestChannel channel = this.simpleContext.getBean("clientOutboundChannel", TestChannel.class);
SimpAnnotationMethodMessageHandler messageHandler = this.simpleContext.getBean(SimpAnnotationMethodMessageHandler.class);
TestChannel channel = this.simpleBrokerContext.getBean("clientOutboundChannel", TestChannel.class);
SimpAnnotationMethodMessageHandler messageHandler = this.simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
headers.setSessionId("sess1");
@ -151,8 +158,8 @@ public class MessageBrokerConfigurationTests { @@ -151,8 +158,8 @@ public class MessageBrokerConfigurationTests {
@Test
public void clientOutboundChannelUsedBySimpleBroker() {
TestChannel channel = this.simpleContext.getBean("clientOutboundChannel", TestChannel.class);
SimpleBrokerMessageHandler broker = this.simpleContext.getBean(SimpleBrokerMessageHandler.class);
TestChannel channel = this.simpleBrokerContext.getBean("clientOutboundChannel", TestChannel.class);
SimpleBrokerMessageHandler broker = this.simpleBrokerContext.getBean(SimpleBrokerMessageHandler.class);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
headers.setSessionId("sess1");
@ -197,12 +204,14 @@ public class MessageBrokerConfigurationTests { @@ -197,12 +204,14 @@ public class MessageBrokerConfigurationTests {
@Test
public void brokerChannel() {
TestChannel channel = this.simpleContext.getBean("brokerChannel", TestChannel.class);
TestChannel channel = this.simpleBrokerContext.getBean("brokerChannel", TestChannel.class);
Set<MessageHandler> handlers = channel.getSubscribers();
assertEquals(2, handlers.size());
assertTrue(handlers.contains(simpleContext.getBean(UserDestinationMessageHandler.class)));
assertTrue(handlers.contains(simpleContext.getBean(SimpleBrokerMessageHandler.class)));
assertTrue(handlers.contains(simpleBrokerContext.getBean(UserDestinationMessageHandler.class)));
assertTrue(handlers.contains(simpleBrokerContext.getBean(SimpleBrokerMessageHandler.class)));
assertNull(channel.getExecutor());
}
@Test
@ -217,8 +226,9 @@ public class MessageBrokerConfigurationTests { @@ -217,8 +226,9 @@ public class MessageBrokerConfigurationTests {
@Test
public void brokerChannelUsedByAnnotatedMethod() {
TestChannel channel = this.simpleContext.getBean("brokerChannel", TestChannel.class);
SimpAnnotationMethodMessageHandler messageHandler = this.simpleContext.getBean(SimpAnnotationMethodMessageHandler.class);
TestChannel channel = this.simpleBrokerContext.getBean("brokerChannel", TestChannel.class);
SimpAnnotationMethodMessageHandler messageHandler =
this.simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class);
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setDestination("/foo");
@ -236,10 +246,10 @@ public class MessageBrokerConfigurationTests { @@ -236,10 +246,10 @@ public class MessageBrokerConfigurationTests {
@Test
public void brokerChannelUsedByUserDestinationMessageHandler() {
TestChannel channel = this.simpleContext.getBean("brokerChannel", TestChannel.class);
UserDestinationMessageHandler messageHandler = this.simpleContext.getBean(UserDestinationMessageHandler.class);
TestChannel channel = this.simpleBrokerContext.getBean("brokerChannel", TestChannel.class);
UserDestinationMessageHandler messageHandler = this.simpleBrokerContext.getBean(UserDestinationMessageHandler.class);
this.simpleContext.getBean(UserSessionRegistry.class).registerSessionId("joe", "s1");
this.simpleBrokerContext.getBean(UserSessionRegistry.class).registerSessionId("joe", "s1");
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setDestination("/user/joe/foo");
@ -285,6 +295,24 @@ public class MessageBrokerConfigurationTests { @@ -285,6 +295,24 @@ public class MessageBrokerConfigurationTests {
assertEquals(MimeTypeUtils.APPLICATION_JSON, ((DefaultContentTypeResolver) resolver).getDefaultMimeType());
}
@Test
public void threadPoolSizeDefault() {
String name = "clientInboundChannelExecutor";
ThreadPoolTaskExecutor executor = this.defaultContext.getBean(name, ThreadPoolTaskExecutor.class);
assertEquals(Runtime.getRuntime().availableProcessors() * 2, executor.getCorePoolSize());
// No way to verify queue capacity
name = "clientOutboundChannelExecutor";
executor = this.defaultContext.getBean(name, ThreadPoolTaskExecutor.class);
assertEquals(Runtime.getRuntime().availableProcessors() * 2, executor.getCorePoolSize());
name = "brokerChannelExecutor";
executor = this.defaultContext.getBean(name, ThreadPoolTaskExecutor.class);
assertEquals(0, executor.getCorePoolSize());
assertEquals(1, executor.getMaxPoolSize());
}
@Test
public void configureMessageConvertersCustom() {
final MessageConverter testConverter = Mockito.mock(MessageConverter.class);
@ -360,7 +388,7 @@ public class MessageBrokerConfigurationTests { @@ -360,7 +388,7 @@ public class MessageBrokerConfigurationTests {
@Test
public void simpValidatorInjected() {
SimpAnnotationMethodMessageHandler messageHandler =
this.simpleContext.getBean(SimpAnnotationMethodMessageHandler.class);
this.simpleBrokerContext.getBean(SimpAnnotationMethodMessageHandler.class);
assertThat(messageHandler.getValidator(), Matchers.notNullValue(Validator.class));
}
@ -381,8 +409,9 @@ public class MessageBrokerConfigurationTests { @@ -381,8 +409,9 @@ public class MessageBrokerConfigurationTests {
}
}
@Configuration
static class SimpleConfig extends AbstractMessageBrokerConfiguration {
static class SimpleBrokerConfig extends AbstractMessageBrokerConfiguration {
@Bean
public TestController subscriptionController() {
@ -409,7 +438,7 @@ public class MessageBrokerConfigurationTests { @@ -409,7 +438,7 @@ public class MessageBrokerConfigurationTests {
}
@Configuration
static class BrokerRelayConfig extends SimpleConfig {
static class BrokerRelayConfig extends SimpleBrokerConfig {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
@ -417,6 +446,10 @@ public class MessageBrokerConfigurationTests { @@ -417,6 +446,10 @@ public class MessageBrokerConfigurationTests {
}
}
@Configuration
static class DefaultConfig extends AbstractMessageBrokerConfiguration {
}
@Configuration
static class CustomChannelConfig extends AbstractMessageBrokerConfiguration {

3
spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java

@ -201,6 +201,9 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { @@ -201,6 +201,9 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
}
else if (!channelName.equals("brokerChannel")) {
executorDef = new RootBeanDefinition(ThreadPoolTaskExecutor.class);
executorDef.getPropertyValues().add("corePoolSize", Runtime.getRuntime().availableProcessors() * 2);
executorDef.getPropertyValues().add("maxPoolSize", Integer.MAX_VALUE);
executorDef.getPropertyValues().add("queueCapacity", Integer.MAX_VALUE);
}
ConstructorArgumentValues values = new ConstructorArgumentValues();

29
spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.0.xsd

@ -152,7 +152,7 @@ @@ -152,7 +152,7 @@
<xsd:annotation>
<xsd:documentation source="java:org.springframework.web.socket.sockjs.support.AbstractSockJsService"><![CDATA[
Minimum number of bytes that can be send over a single HTTP streaming request before it will be closed.
Defaults to 128K (i.e. 128 * 1024).
Defaults to 128K (i.e. 128 1024).
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
@ -346,31 +346,44 @@ @@ -346,31 +346,44 @@
<xsd:attribute name="core-pool-size" type="xsd:int" use="optional">
<xsd:annotation>
<xsd:documentation source="java:org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><![CDATA[
The core pool size. Default value is 1.
Set the core pool size of the ThreadPoolExecutor.
NOTE: the core pool size is effectively the max pool size when an unbounded queue-capacity is configured (the default).
This is essentially the "Unbounded queues" strategy as explained in java.util.concurrent.ThreadPoolExecutor.
When this strategy is used, the max pool size is effectively ignored.
By default this is set to twice the value of Runtime.availableProcessors().
In an an application where tasks do not block frequently,
the number should be closer to or equal to the number of available CPUs/cores.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="max-pool-size" type="xsd:int" use="optional">
<xsd:annotation>
<xsd:documentation source="java:org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><![CDATA[
The maximum pool size. Default value is Integer.MAX_VALUE.
Set the max pool size of the ThreadPoolExecutor.
NOTE: when an unbounded queue-capacity is configured (the default), the max pool size is effectively ignored.
See the "Unbounded queues" strategy in java.util.concurrent.ThreadPoolExecutor for more details.
By default this is set to Integer.MAX_VALUE.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="keep-alive-seconds" type="xsd:int" use="optional">
<xsd:annotation>
<xsd:documentation source="java:org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><![CDATA[
The keep-alive value in seconds. Default value 60.
Set the time limit for which threads may remain idle before being terminated.
If there are more than the core number of threads currently in the pool, after waiting this amount of time without
processing a task, excess threads will be terminated. This overrides any value set in the constructor.
By default this is set to 60.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="queue-capacity" type="xsd:int" use="optional">
<xsd:annotation>
<xsd:documentation source="java:org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"><![CDATA[
The capacity for the executor's BlockingQueue.
Default is Integer.MAX_VALUE.
Any positive value will lead to a LinkedBlockingQueue instance;
any other value will lead to a SynchronousQueue instance.
Set the queue capacity for the ThreadPoolExecutor.
NOTE: when an unbounded queue-capacity is configured (the default) the core pool size is effectively the max pool size.
This is essentially the "Unbounded queues" strategy as explained in java.util.concurrent.ThreadPoolExecutor.
When this strategy is used, the max pool size is effectively ignored.
By default this is set to Integer.MAX_VALUE.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>

8
spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java

@ -136,11 +136,11 @@ public class MessageBrokerBeanDefinitionParserTests { @@ -136,11 +136,11 @@ public class MessageBrokerBeanDefinitionParserTests {
Arrays.<Class<? extends MessageHandler>>asList(SimpAnnotationMethodMessageHandler.class,
UserDestinationMessageHandler.class, SimpleBrokerMessageHandler.class);
testChannel("clientInboundChannel", subscriberTypes, 0);
testExecutor("clientInboundChannel", 1, Integer.MAX_VALUE, 60);
testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
subscriberTypes = Arrays.<Class<? extends MessageHandler>>asList(SubProtocolWebSocketHandler.class);
testChannel("clientOutboundChannel", subscriberTypes, 0);
testExecutor("clientOutboundChannel", 1, Integer.MAX_VALUE, 60);
testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
subscriberTypes = Arrays.<Class<? extends MessageHandler>>asList(
SimpleBrokerMessageHandler.class, UserDestinationMessageHandler.class);
@ -199,11 +199,11 @@ public class MessageBrokerBeanDefinitionParserTests { @@ -199,11 +199,11 @@ public class MessageBrokerBeanDefinitionParserTests {
Arrays.<Class<? extends MessageHandler>>asList(SimpAnnotationMethodMessageHandler.class,
UserDestinationMessageHandler.class, StompBrokerRelayMessageHandler.class);
testChannel("clientInboundChannel", subscriberTypes, 0);
testExecutor("clientInboundChannel", 1, Integer.MAX_VALUE, 60);
testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
subscriberTypes = Arrays.<Class<? extends MessageHandler>>asList(SubProtocolWebSocketHandler.class);
testChannel("clientOutboundChannel", subscriberTypes, 0);
testExecutor("clientOutboundChannel", 1, Integer.MAX_VALUE, 60);
testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
subscriberTypes = Arrays.<Class<? extends MessageHandler>>asList(
StompBrokerRelayMessageHandler.class, UserDestinationMessageHandler.class);

Loading…
Cancel
Save