diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java index 7c5cfed550..f490c0cf66 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,7 @@ import java.util.Map; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.BeanInitializationException; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Bean; @@ -149,8 +150,10 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @Bean - public AbstractSubscribableChannel clientInboundChannel(TaskExecutor clientInboundChannelExecutor) { - ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor); + public AbstractSubscribableChannel clientInboundChannel( + @Qualifier("clientInboundChannelExecutor") TaskExecutor executor) { + + ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(executor); channel.setLogger(SimpLogging.forLog(channel.getLogger())); ChannelRegistration reg = getClientInboundChannelRegistration(); if (reg.hasInterceptors()) { @@ -185,8 +188,10 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } @Bean - public AbstractSubscribableChannel clientOutboundChannel(TaskExecutor clientOutboundChannelExecutor) { - ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor); + public AbstractSubscribableChannel clientOutboundChannel( + @Qualifier("clientOutboundChannelExecutor") TaskExecutor executor) { + + ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(executor); channel.setLogger(SimpLogging.forLog(channel.getLogger())); ChannelRegistration reg = getClientOutboundChannelRegistration(); if (reg.hasInterceptors()) { @@ -221,13 +226,14 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC } @Bean - public AbstractSubscribableChannel brokerChannel(AbstractSubscribableChannel clientInboundChannel, - AbstractSubscribableChannel clientOutboundChannel, TaskExecutor brokerChannelExecutor) { + public AbstractSubscribableChannel brokerChannel( + AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel, + @Qualifier("brokerChannelExecutor") TaskExecutor executor) { MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel); ChannelRegistration registration = registry.getBrokerChannelRegistration(); ExecutorSubscribableChannel channel = (registration.hasTaskExecutor() ? - new ExecutorSubscribableChannel(brokerChannelExecutor) : new ExecutorSubscribableChannel()); + new ExecutorSubscribableChannel(executor) : new ExecutorSubscribableChannel()); registration.interceptors(new ImmutableMessageChannelInterceptor()); channel.setLogger(SimpLogging.forLog(channel.getLogger())); channel.setInterceptors(registration.getInterceptors()); @@ -366,10 +372,10 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @Bean @Nullable - public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler(AbstractSubscribableChannel clientInboundChannel, - AbstractSubscribableChannel clientOutboundChannel, AbstractSubscribableChannel brokerChannel, - UserDestinationMessageHandler userDestinationMessageHandler, @Nullable MessageHandler userRegistryMessageHandler, - UserDestinationResolver userDestinationResolver) { + public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler( + AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel, + AbstractSubscribableChannel brokerChannel, UserDestinationMessageHandler userDestinationMessageHandler, + @Nullable MessageHandler userRegistryMessageHandler, UserDestinationResolver userDestinationResolver) { MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel); StompBrokerRelayMessageHandler handler = registry.getStompBrokerRelay(brokerChannel); @@ -411,7 +417,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC public MessageHandler userRegistryMessageHandler( AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel, SimpUserRegistry userRegistry, SimpMessagingTemplate brokerMessagingTemplate, - TaskScheduler messageBrokerTaskScheduler) { + @Qualifier("messageBrokerTaskScheduler") TaskScheduler scheduler) { MessageBrokerRegistry brokerRegistry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel); if (brokerRegistry.getUserRegistryBroadcast() == null) { @@ -420,7 +426,7 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC Assert.isInstanceOf(MultiServerUserRegistry.class, userRegistry, "MultiServerUserRegistry required"); return new UserRegistryMessageHandler((MultiServerUserRegistry) userRegistry, brokerMessagingTemplate, brokerRegistry.getUserRegistryBroadcast(), - messageBrokerTaskScheduler); + scheduler); } // Expose alias for 4.1 compatibility diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java index 4478ee9a56..5fdcf95e7e 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketConfigurationSupport.java @@ -18,6 +18,7 @@ package org.springframework.web.socket.config.annotation; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.lang.Nullable; import org.springframework.scheduling.TaskScheduler; @@ -39,7 +40,9 @@ public class WebSocketConfigurationSupport { @Bean - public HandlerMapping webSocketHandlerMapping(DefaultSockJsSchedulerContainer schedulerContainer) { + public HandlerMapping webSocketHandlerMapping( + @Qualifier("defaultSockJsSchedulerContainer") DefaultSockJsSchedulerContainer schedulerContainer) { + ServletWebSocketHandlerRegistry registry = initHandlerRegistry(); if (registry.requiresTaskScheduler()) { TaskScheduler scheduler = schedulerContainer.getScheduler(); diff --git a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java index e42195502b..162b3ab52f 100644 --- a/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java +++ b/spring-websocket/src/main/java/org/springframework/web/socket/config/annotation/WebSocketMessageBrokerConfigurationSupport.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package org.springframework.web.socket.config.annotation; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.CustomScopeConfigurer; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; @@ -129,17 +130,19 @@ public abstract class WebSocketMessageBrokerConfigurationSupport extends Abstrac @Bean public WebSocketMessageBrokerStats webSocketMessageBrokerStats( @Nullable AbstractBrokerMessageHandler stompBrokerRelayMessageHandler, - WebSocketHandler subProtocolWebSocketHandler, TaskExecutor clientInboundChannelExecutor, - TaskExecutor clientOutboundChannelExecutor, TaskScheduler messageBrokerTaskScheduler) { + WebSocketHandler subProtocolWebSocketHandler, + @Qualifier("clientInboundChannelExecutor") TaskExecutor inboundExecutor, + @Qualifier("clientOutboundChannelExecutor") TaskExecutor outboundExecutor, + @Qualifier("messageBrokerTaskScheduler") TaskScheduler scheduler) { WebSocketMessageBrokerStats stats = new WebSocketMessageBrokerStats(); stats.setSubProtocolWebSocketHandler((SubProtocolWebSocketHandler) subProtocolWebSocketHandler); if (stompBrokerRelayMessageHandler instanceof StompBrokerRelayMessageHandler) { stats.setStompBrokerRelay((StompBrokerRelayMessageHandler) stompBrokerRelayMessageHandler); } - stats.setInboundChannelExecutor(clientInboundChannelExecutor); - stats.setOutboundChannelExecutor(clientOutboundChannelExecutor); - stats.setSockJsTaskScheduler(messageBrokerTaskScheduler); + stats.setInboundChannelExecutor(inboundExecutor); + stats.setOutboundChannelExecutor(outboundExecutor); + stats.setSockJsTaskScheduler(scheduler); return stats; }