diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageSender.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageSender.java index 8baa4b5610..54633f2dac 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageSender.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageSender.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.springframework.messaging.simp.broker; import java.util.Queue; @@ -21,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; +import org.springframework.lang.Nullable; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; @@ -31,8 +33,8 @@ import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.util.Assert; /** - * Submit messages to an ExecutorSubscribableChannel, one at a time. The channel - * must have been configured with {@link #configureOutboundChannel}. + * Submit messages to an {@link ExecutorSubscribableChannel}, one at a time. + * The channel must have been configured with {@link #configureOutboundChannel}. * * @author Rossen Stoyanchev * @since 5.1 @@ -69,7 +71,6 @@ class OrderedMessageSender implements MessageChannel { } private void trySend() { - // Take sendInProgress flag only if queue is not empty if (this.messages.isEmpty()) { return; @@ -141,7 +142,9 @@ class OrderedMessageSender implements MessageChannel { private static class CallbackInterceptor implements ExecutorChannelInterceptor { @Override - public void afterMessageHandled(Message msg, MessageChannel ch, MessageHandler handler, Exception ex) { + public void afterMessageHandled( + Message msg, MessageChannel ch, MessageHandler handler, @Nullable Exception ex) { + Runnable task = (Runnable) msg.getHeaders().get(OrderedMessageSender.COMPLETION_TASK_HEADER); if (task != null) { task.run(); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java index 2eeb7cfc5f..b998ee29d8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -162,30 +162,6 @@ public class MessageBrokerRegistry { return this.userDestinationPrefix; } - /** - * Whether the client must receive messages in the order of publication. - *

By default messages sent to the {@code "clientOutboundChannel"} may - * not be processed in the same order because the channel is backed by a - * ThreadPoolExecutor that in turn does not guarantee processing in order. - *

When this flag is set to {@code true} messages within the same session - * will be sent to the {@code "clientOutboundChannel"} one at a time in - * order to preserve the order of publication. Enable this only if needed - * since there is some performance overhead to keep messages in order. - * @param preservePublishOrder whether to publish in order - * @since 5.1 - */ - public void setPreservePublishOrder(boolean preservePublishOrder) { - this.preservePublishOrder = preservePublishOrder; - } - - /** - * Whether to ensure messages are received in the order of publication. - * @since 5.1 - */ - protected boolean isPreservePublishOrder() { - return this.preservePublishOrder; - } - /** * Configure the PathMatcher to use to match the destinations of incoming * messages to {@code @MessageMapping} and {@code @SubscribeMapping} methods. @@ -225,6 +201,21 @@ public class MessageBrokerRegistry { return this; } + /** + * Whether the client must receive messages in the order of publication. + *

By default messages sent to the {@code "clientOutboundChannel"} may + * not be processed in the same order because the channel is backed by a + * ThreadPoolExecutor that in turn does not guarantee processing in order. + *

When this flag is set to {@code true} messages within the same session + * will be sent to the {@code "clientOutboundChannel"} one at a time in + * order to preserve the order of publication. Enable this only if needed + * since there is some performance overhead to keep messages in order. + * @since 5.1 + */ + public MessageBrokerRegistry setPreservePublishOrder(boolean preservePublishOrder) { + this.preservePublishOrder = preservePublishOrder; + return this; + } @Nullable protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/ChannelInterceptor.java b/spring-messaging/src/main/java/org/springframework/messaging/support/ChannelInterceptor.java index 965872d264..b3fe57bc98 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/ChannelInterceptor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/ChannelInterceptor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,6 +28,8 @@ import org.springframework.messaging.MessageChannel; * @author Mark Fisher * @author Rossen Stoyanchev * @since 4.0 + * @see Message + * @see MessageChannel */ public interface ChannelInterceptor { @@ -56,8 +58,8 @@ public interface ChannelInterceptor { * completed and returned a Message, i.e. it did not return {@code null}. * @since 4.1 */ - default void afterSendCompletion(Message message, MessageChannel channel, boolean sent, - @Nullable Exception ex) { + default void afterSendCompletion( + Message message, MessageChannel channel, boolean sent, @Nullable Exception ex) { } /** diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java b/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java index 67b4f5d584..c0e41ecff8 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,10 +26,13 @@ import org.springframework.messaging.MessageHandler; * asynchronous sending of a {@link org.springframework.messaging.Message} to * a specific subscriber through an {@link java.util.concurrent.Executor}. * Supported on {@link org.springframework.messaging.MessageChannel} - * implementations that can be configured with an Executor. + * implementations that can be configured with an {@code Executor}. * * @author Rossen Stoyanchev * @since 4.1 + * @see Message + * @see MessageChannel + * @see MessageHandler */ public interface ExecutorChannelInterceptor extends ChannelInterceptor { diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/OrderedMessageSenderTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/OrderedMessageSenderTests.java index c75c167d92..7b49121922 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/OrderedMessageSenderTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/OrderedMessageSenderTests.java @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.springframework.messaging.simp.broker; import java.util.concurrent.CountDownLatch; diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java index 5f008688dd..884f9f5034 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java @@ -77,8 +77,7 @@ public class SimpleBrokerMessageHandlerTests { @Test - public void subcribePublish() { - + public void subscribePublish() { startSession("sess1"); startSession("sess2"); @@ -103,8 +102,7 @@ public class SimpleBrokerMessageHandlerTests { } @Test - public void subcribeDisconnectPublish() { - + public void subscribeDisconnectPublish() { String sess1 = "sess1"; String sess2 = "sess2"; @@ -308,9 +306,9 @@ public class SimpleBrokerMessageHandlerTests { return connectMessage; } - private Message createSubscriptionMessage(String sessionId, String subcriptionId, String destination) { + private Message createSubscriptionMessage(String sessionId, String subscriptionId, String destination) { SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.SUBSCRIBE); - headers.setSubscriptionId(subcriptionId); + headers.setSubscriptionId(subscriptionId); headers.setDestination(destination); headers.setSessionId(sessionId); return MessageBuilder.createMessage("", headers.getMessageHeaders()); @@ -330,11 +328,11 @@ public class SimpleBrokerMessageHandlerTests { return MessageBuilder.createMessage(payload, headers.getMessageHeaders()); } - private boolean messageCaptured(String sessionId, String subcriptionId, String destination) { + private boolean messageCaptured(String sessionId, String subscriptionId, String destination) { for (Message message : this.messageCaptor.getAllValues()) { SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); if (sessionId.equals(headers.getSessionId())) { - if (subcriptionId.equals(headers.getSubscriptionId())) { + if (subscriptionId.equals(headers.getSubscriptionId())) { if (destination.equals(headers.getDestination())) { return true; } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java index 15d98b4dba..ef6254c49d 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java @@ -117,12 +117,10 @@ public class MessageBrokerConfigurationTests { AbstractSubscribableChannel channel = context.getBean( "clientInboundChannel", AbstractSubscribableChannel.class); - assertEquals(3, channel.getInterceptors().size()); CustomThreadPoolTaskExecutor taskExecutor = context.getBean( "clientInboundChannelExecutor", CustomThreadPoolTaskExecutor.class); - assertEquals(11, taskExecutor.getCorePoolSize()); assertEquals(12, taskExecutor.getMaxPoolSize()); assertEquals(13, taskExecutor.getKeepAliveSeconds()); @@ -512,7 +510,6 @@ public class MessageBrokerConfigurationTests { assertEquals("123", new String((byte[]) outputMessage.getPayload())); outChannel.messages.clear(); - // 3. Send message via broker channel SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index da5e538aa1..17fe6f9ddd 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -49,9 +49,7 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; import org.springframework.util.SocketUtils; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; /** * Integration tests for {@link StompBrokerRelayMessageHandler} running against ActiveMQ. @@ -79,7 +77,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Before - public void setUp() throws Exception { + public void setup() throws Exception { logger.debug("Setting up before '" + this.testName.getMethodName() + "'"); this.port = SocketUtils.findAvailableTcpPort(61613); this.responseChannel = new ExecutorSubscribableChannel(); @@ -116,7 +114,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } @After - public void tearDown() throws Exception { + public void stop() throws Exception { try { logger.debug("STOMP broker relay stats: " + this.relay.getStatsInfo()); this.relay.stop(); @@ -170,7 +168,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Test(expected = MessageDeliveryException.class) public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception { - logger.debug("Starting test messageDeliveryExceptionIfSystemSessionForwardFails()"); stopActiveMqBrokerAndAwait(); @@ -181,8 +178,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { } @Test - public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception { - logger.debug("Starting test brokerBecomingUnvailableTriggersErrorFrame()"); + public void brokerBecomingUnavailableTriggersErrorFrame() throws Exception { + logger.debug("Starting test brokerBecomingUnavailableTriggersErrorFrame()"); String sess1 = "sess1"; MessageExchange connect = MessageExchangeBuilder.connect(sess1).build(); diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java index 0d16f7c140..ff91441b93 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java @@ -210,8 +210,7 @@ public class MessageBrokerBeanDefinitionParserTests { assertArrayEquals(new long[] {15000, 15000}, brokerMessageHandler.getHeartbeatValue()); assertTrue(brokerMessageHandler.isPreservePublishOrder()); - List> subscriberTypes = - Arrays.asList(SimpAnnotationMethodMessageHandler.class, + List> subscriberTypes = Arrays.asList(SimpAnnotationMethodMessageHandler.class, UserDestinationMessageHandler.class, SimpleBrokerMessageHandler.class); testChannel("clientInboundChannel", subscriberTypes, 2); testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60); @@ -220,8 +219,7 @@ public class MessageBrokerBeanDefinitionParserTests { testChannel("clientOutboundChannel", subscriberTypes, 2); testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60); - subscriberTypes = Arrays.asList( - SimpleBrokerMessageHandler.class, UserDestinationMessageHandler.class); + subscriberTypes = Arrays.asList(SimpleBrokerMessageHandler.class, UserDestinationMessageHandler.class); testChannel("brokerChannel", subscriberTypes, 1); try { this.appContext.getBean("brokerChannelExecutor", ThreadPoolTaskExecutor.class); @@ -281,9 +279,8 @@ public class MessageBrokerBeanDefinitionParserTests { assertThat(messageBroker.getDestinationPrefixes(), Matchers.containsInAnyOrder("/topic","/queue")); assertTrue(messageBroker.isPreservePublishOrder()); - List> subscriberTypes = Arrays.asList( - SimpAnnotationMethodMessageHandler.class, UserDestinationMessageHandler.class, - StompBrokerRelayMessageHandler.class); + List> subscriberTypes = Arrays.asList(SimpAnnotationMethodMessageHandler.class, + UserDestinationMessageHandler.class, StompBrokerRelayMessageHandler.class); testChannel("clientInboundChannel", subscriberTypes, 2); testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60); @@ -382,9 +379,8 @@ public class MessageBrokerBeanDefinitionParserTests { assertSame(this.appContext.getBean("myValidator"), validator); assertThat(validator, Matchers.instanceOf(TestValidator.class)); - List> subscriberTypes = - Arrays.>asList(SimpAnnotationMethodMessageHandler.class, - UserDestinationMessageHandler.class, SimpleBrokerMessageHandler.class); + List> subscriberTypes = Arrays.asList(SimpAnnotationMethodMessageHandler.class, + UserDestinationMessageHandler.class, SimpleBrokerMessageHandler.class); testChannel("clientInboundChannel", subscriberTypes, 3); testExecutor("clientInboundChannel", 100, 200, 600); @@ -394,16 +390,13 @@ public class MessageBrokerBeanDefinitionParserTests { testChannel("clientOutboundChannel", subscriberTypes, 3); testExecutor("clientOutboundChannel", 101, 201, 601); - subscriberTypes = Arrays.>asList(SimpleBrokerMessageHandler.class, - UserDestinationMessageHandler.class); + subscriberTypes = Arrays.asList(SimpleBrokerMessageHandler.class, UserDestinationMessageHandler.class); testChannel("brokerChannel", subscriberTypes, 1); testExecutor("brokerChannel", 102, 202, 602); } - // SPR-11623 - - @Test + @Test // SPR-11623 public void customChannelsWithDefaultExecutor() { loadBeanDefinitions("websocket-config-broker-customchannels-default-executor.xml"); diff --git a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompWebSocketIntegrationTests.java b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompWebSocketIntegrationTests.java index 162618e0e0..fbf6218266 100644 --- a/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompWebSocketIntegrationTests.java +++ b/spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompWebSocketIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2002-2018 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,8 +57,8 @@ import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerCo import org.springframework.web.socket.handler.TextWebSocketHandler; import org.springframework.web.socket.server.HandshakeHandler; -import static org.junit.Assert.assertTrue; -import static org.springframework.web.socket.messaging.StompTextMessageBuilder.create; +import static org.junit.Assert.*; +import static org.springframework.web.socket.messaging.StompTextMessageBuilder.*; /** * Integration tests with annotated message-handling methods. @@ -70,6 +70,7 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration private static final long TIMEOUT = 10; + @Parameters(name = "server [{0}], client [{1}]") public static Object[][] arguments() { return new Object[][] { @@ -266,7 +267,7 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration } - static interface ScopedBean { + interface ScopedBean { String getValue(); } @@ -320,9 +321,9 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration @Configuration @ComponentScan( - basePackageClasses=StompWebSocketIntegrationTests.class, - useDefaultFilters=false, - includeFilters=@ComponentScan.Filter(IntegrationTestController.class)) + basePackageClasses = StompWebSocketIntegrationTests.class, + useDefaultFilters = false, + includeFilters = @ComponentScan.Filter(IntegrationTestController.class)) static class TestMessageBrokerConfigurer implements WebSocketMessageBrokerConfigurer { @Autowired