Browse Source

Polishing

pull/1895/merge
Juergen Hoeller 6 years ago
parent
commit
02403f6a34
  1. 11
      spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageSender.java
  2. 41
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java
  3. 8
      spring-messaging/src/main/java/org/springframework/messaging/support/ChannelInterceptor.java
  4. 7
      spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java
  5. 1
      spring-messaging/src/test/java/org/springframework/messaging/simp/broker/OrderedMessageSenderTests.java
  6. 14
      spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java
  7. 3
      spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java
  8. 13
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java
  9. 23
      spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java
  10. 15
      spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompWebSocketIntegrationTests.java

11
spring-messaging/src/main/java/org/springframework/messaging/simp/broker/OrderedMessageSender.java

@ -13,6 +13,7 @@ @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.broker;
import java.util.Queue;
@ -21,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean; @@ -21,6 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@ -31,8 +33,8 @@ import org.springframework.messaging.support.MessageHeaderAccessor; @@ -31,8 +33,8 @@ import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.Assert;
/**
* Submit messages to an ExecutorSubscribableChannel, one at a time. The channel
* must have been configured with {@link #configureOutboundChannel}.
* Submit messages to an {@link ExecutorSubscribableChannel}, one at a time.
* The channel must have been configured with {@link #configureOutboundChannel}.
*
* @author Rossen Stoyanchev
* @since 5.1
@ -69,7 +71,6 @@ class OrderedMessageSender implements MessageChannel { @@ -69,7 +71,6 @@ class OrderedMessageSender implements MessageChannel {
}
private void trySend() {
// Take sendInProgress flag only if queue is not empty
if (this.messages.isEmpty()) {
return;
@ -141,7 +142,9 @@ class OrderedMessageSender implements MessageChannel { @@ -141,7 +142,9 @@ class OrderedMessageSender implements MessageChannel {
private static class CallbackInterceptor implements ExecutorChannelInterceptor {
@Override
public void afterMessageHandled(Message<?> msg, MessageChannel ch, MessageHandler handler, Exception ex) {
public void afterMessageHandled(
Message<?> msg, MessageChannel ch, MessageHandler handler, @Nullable Exception ex) {
Runnable task = (Runnable) msg.getHeaders().get(OrderedMessageSender.COMPLETION_TASK_HEADER);
if (task != null) {
task.run();

41
spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -162,30 +162,6 @@ public class MessageBrokerRegistry { @@ -162,30 +162,6 @@ public class MessageBrokerRegistry {
return this.userDestinationPrefix;
}
/**
* Whether the client must receive messages in the order of publication.
* <p>By default messages sent to the {@code "clientOutboundChannel"} may
* not be processed in the same order because the channel is backed by a
* ThreadPoolExecutor that in turn does not guarantee processing in order.
* <p>When this flag is set to {@code true} messages within the same session
* will be sent to the {@code "clientOutboundChannel"} one at a time in
* order to preserve the order of publication. Enable this only if needed
* since there is some performance overhead to keep messages in order.
* @param preservePublishOrder whether to publish in order
* @since 5.1
*/
public void setPreservePublishOrder(boolean preservePublishOrder) {
this.preservePublishOrder = preservePublishOrder;
}
/**
* Whether to ensure messages are received in the order of publication.
* @since 5.1
*/
protected boolean isPreservePublishOrder() {
return this.preservePublishOrder;
}
/**
* Configure the PathMatcher to use to match the destinations of incoming
* messages to {@code @MessageMapping} and {@code @SubscribeMapping} methods.
@ -225,6 +201,21 @@ public class MessageBrokerRegistry { @@ -225,6 +201,21 @@ public class MessageBrokerRegistry {
return this;
}
/**
* Whether the client must receive messages in the order of publication.
* <p>By default messages sent to the {@code "clientOutboundChannel"} may
* not be processed in the same order because the channel is backed by a
* ThreadPoolExecutor that in turn does not guarantee processing in order.
* <p>When this flag is set to {@code true} messages within the same session
* will be sent to the {@code "clientOutboundChannel"} one at a time in
* order to preserve the order of publication. Enable this only if needed
* since there is some performance overhead to keep messages in order.
* @since 5.1
*/
public MessageBrokerRegistry setPreservePublishOrder(boolean preservePublishOrder) {
this.preservePublishOrder = preservePublishOrder;
return this;
}
@Nullable
protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) {

8
spring-messaging/src/main/java/org/springframework/messaging/support/ChannelInterceptor.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -28,6 +28,8 @@ import org.springframework.messaging.MessageChannel; @@ -28,6 +28,8 @@ import org.springframework.messaging.MessageChannel;
* @author Mark Fisher
* @author Rossen Stoyanchev
* @since 4.0
* @see Message
* @see MessageChannel
*/
public interface ChannelInterceptor {
@ -56,8 +58,8 @@ public interface ChannelInterceptor { @@ -56,8 +58,8 @@ public interface ChannelInterceptor {
* completed and returned a Message, i.e. it did not return {@code null}.
* @since 4.1
*/
default void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent,
@Nullable Exception ex) {
default void afterSendCompletion(
Message<?> message, MessageChannel channel, boolean sent, @Nullable Exception ex) {
}
/**

7
spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -26,10 +26,13 @@ import org.springframework.messaging.MessageHandler; @@ -26,10 +26,13 @@ import org.springframework.messaging.MessageHandler;
* asynchronous sending of a {@link org.springframework.messaging.Message} to
* a specific subscriber through an {@link java.util.concurrent.Executor}.
* Supported on {@link org.springframework.messaging.MessageChannel}
* implementations that can be configured with an Executor.
* implementations that can be configured with an {@code Executor}.
*
* @author Rossen Stoyanchev
* @since 4.1
* @see Message
* @see MessageChannel
* @see MessageHandler
*/
public interface ExecutorChannelInterceptor extends ChannelInterceptor {

1
spring-messaging/src/test/java/org/springframework/messaging/simp/broker/OrderedMessageSenderTests.java

@ -13,6 +13,7 @@ @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp.broker;
import java.util.concurrent.CountDownLatch;

14
spring-messaging/src/test/java/org/springframework/messaging/simp/broker/SimpleBrokerMessageHandlerTests.java

@ -77,8 +77,7 @@ public class SimpleBrokerMessageHandlerTests { @@ -77,8 +77,7 @@ public class SimpleBrokerMessageHandlerTests {
@Test
public void subcribePublish() {
public void subscribePublish() {
startSession("sess1");
startSession("sess2");
@ -103,8 +102,7 @@ public class SimpleBrokerMessageHandlerTests { @@ -103,8 +102,7 @@ public class SimpleBrokerMessageHandlerTests {
}
@Test
public void subcribeDisconnectPublish() {
public void subscribeDisconnectPublish() {
String sess1 = "sess1";
String sess2 = "sess2";
@ -308,9 +306,9 @@ public class SimpleBrokerMessageHandlerTests { @@ -308,9 +306,9 @@ public class SimpleBrokerMessageHandlerTests {
return connectMessage;
}
private Message<String> createSubscriptionMessage(String sessionId, String subcriptionId, String destination) {
private Message<String> createSubscriptionMessage(String sessionId, String subscriptionId, String destination) {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.SUBSCRIBE);
headers.setSubscriptionId(subcriptionId);
headers.setSubscriptionId(subscriptionId);
headers.setDestination(destination);
headers.setSessionId(sessionId);
return MessageBuilder.createMessage("", headers.getMessageHeaders());
@ -330,11 +328,11 @@ public class SimpleBrokerMessageHandlerTests { @@ -330,11 +328,11 @@ public class SimpleBrokerMessageHandlerTests {
return MessageBuilder.createMessage(payload, headers.getMessageHeaders());
}
private boolean messageCaptured(String sessionId, String subcriptionId, String destination) {
private boolean messageCaptured(String sessionId, String subscriptionId, String destination) {
for (Message<?> message : this.messageCaptor.getAllValues()) {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
if (sessionId.equals(headers.getSessionId())) {
if (subcriptionId.equals(headers.getSubscriptionId())) {
if (subscriptionId.equals(headers.getSubscriptionId())) {
if (destination.equals(headers.getDestination())) {
return true;
}

3
spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java

@ -117,12 +117,10 @@ public class MessageBrokerConfigurationTests { @@ -117,12 +117,10 @@ public class MessageBrokerConfigurationTests {
AbstractSubscribableChannel channel = context.getBean(
"clientInboundChannel", AbstractSubscribableChannel.class);
assertEquals(3, channel.getInterceptors().size());
CustomThreadPoolTaskExecutor taskExecutor = context.getBean(
"clientInboundChannelExecutor", CustomThreadPoolTaskExecutor.class);
assertEquals(11, taskExecutor.getCorePoolSize());
assertEquals(12, taskExecutor.getMaxPoolSize());
assertEquals(13, taskExecutor.getKeepAliveSeconds());
@ -512,7 +510,6 @@ public class MessageBrokerConfigurationTests { @@ -512,7 +510,6 @@ public class MessageBrokerConfigurationTests {
assertEquals("123", new String((byte[]) outputMessage.getPayload()));
outChannel.messages.clear();
// 3. Send message via broker channel
SimpMessagingTemplate template = new SimpMessagingTemplate(brokerChannel);

13
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java

@ -49,9 +49,7 @@ import org.springframework.messaging.support.MessageBuilder; @@ -49,9 +49,7 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.SocketUtils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
/**
* Integration tests for {@link StompBrokerRelayMessageHandler} running against ActiveMQ.
@ -79,7 +77,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -79,7 +77,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Before
public void setUp() throws Exception {
public void setup() throws Exception {
logger.debug("Setting up before '" + this.testName.getMethodName() + "'");
this.port = SocketUtils.findAvailableTcpPort(61613);
this.responseChannel = new ExecutorSubscribableChannel();
@ -116,7 +114,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -116,7 +114,7 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
@After
public void tearDown() throws Exception {
public void stop() throws Exception {
try {
logger.debug("STOMP broker relay stats: " + this.relay.getStatsInfo());
this.relay.stop();
@ -170,7 +168,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -170,7 +168,6 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
@Test(expected = MessageDeliveryException.class)
public void messageDeliveryExceptionIfSystemSessionForwardFails() throws Exception {
logger.debug("Starting test messageDeliveryExceptionIfSystemSessionForwardFails()");
stopActiveMqBrokerAndAwait();
@ -181,8 +178,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @@ -181,8 +178,8 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
}
@Test
public void brokerBecomingUnvailableTriggersErrorFrame() throws Exception {
logger.debug("Starting test brokerBecomingUnvailableTriggersErrorFrame()");
public void brokerBecomingUnavailableTriggersErrorFrame() throws Exception {
logger.debug("Starting test brokerBecomingUnavailableTriggersErrorFrame()");
String sess1 = "sess1";
MessageExchange connect = MessageExchangeBuilder.connect(sess1).build();

23
spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java

@ -210,8 +210,7 @@ public class MessageBrokerBeanDefinitionParserTests { @@ -210,8 +210,7 @@ public class MessageBrokerBeanDefinitionParserTests {
assertArrayEquals(new long[] {15000, 15000}, brokerMessageHandler.getHeartbeatValue());
assertTrue(brokerMessageHandler.isPreservePublishOrder());
List<Class<? extends MessageHandler>> subscriberTypes =
Arrays.asList(SimpAnnotationMethodMessageHandler.class,
List<Class<? extends MessageHandler>> subscriberTypes = Arrays.asList(SimpAnnotationMethodMessageHandler.class,
UserDestinationMessageHandler.class, SimpleBrokerMessageHandler.class);
testChannel("clientInboundChannel", subscriberTypes, 2);
testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
@ -220,8 +219,7 @@ public class MessageBrokerBeanDefinitionParserTests { @@ -220,8 +219,7 @@ public class MessageBrokerBeanDefinitionParserTests {
testChannel("clientOutboundChannel", subscriberTypes, 2);
testExecutor("clientOutboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
subscriberTypes = Arrays.asList(
SimpleBrokerMessageHandler.class, UserDestinationMessageHandler.class);
subscriberTypes = Arrays.asList(SimpleBrokerMessageHandler.class, UserDestinationMessageHandler.class);
testChannel("brokerChannel", subscriberTypes, 1);
try {
this.appContext.getBean("brokerChannelExecutor", ThreadPoolTaskExecutor.class);
@ -281,9 +279,8 @@ public class MessageBrokerBeanDefinitionParserTests { @@ -281,9 +279,8 @@ public class MessageBrokerBeanDefinitionParserTests {
assertThat(messageBroker.getDestinationPrefixes(), Matchers.containsInAnyOrder("/topic","/queue"));
assertTrue(messageBroker.isPreservePublishOrder());
List<Class<? extends MessageHandler>> subscriberTypes = Arrays.asList(
SimpAnnotationMethodMessageHandler.class, UserDestinationMessageHandler.class,
StompBrokerRelayMessageHandler.class);
List<Class<? extends MessageHandler>> subscriberTypes = Arrays.asList(SimpAnnotationMethodMessageHandler.class,
UserDestinationMessageHandler.class, StompBrokerRelayMessageHandler.class);
testChannel("clientInboundChannel", subscriberTypes, 2);
testExecutor("clientInboundChannel", Runtime.getRuntime().availableProcessors() * 2, Integer.MAX_VALUE, 60);
@ -382,9 +379,8 @@ public class MessageBrokerBeanDefinitionParserTests { @@ -382,9 +379,8 @@ public class MessageBrokerBeanDefinitionParserTests {
assertSame(this.appContext.getBean("myValidator"), validator);
assertThat(validator, Matchers.instanceOf(TestValidator.class));
List<Class<? extends MessageHandler>> subscriberTypes =
Arrays.<Class<? extends MessageHandler>>asList(SimpAnnotationMethodMessageHandler.class,
UserDestinationMessageHandler.class, SimpleBrokerMessageHandler.class);
List<Class<? extends MessageHandler>> subscriberTypes = Arrays.asList(SimpAnnotationMethodMessageHandler.class,
UserDestinationMessageHandler.class, SimpleBrokerMessageHandler.class);
testChannel("clientInboundChannel", subscriberTypes, 3);
testExecutor("clientInboundChannel", 100, 200, 600);
@ -394,16 +390,13 @@ public class MessageBrokerBeanDefinitionParserTests { @@ -394,16 +390,13 @@ public class MessageBrokerBeanDefinitionParserTests {
testChannel("clientOutboundChannel", subscriberTypes, 3);
testExecutor("clientOutboundChannel", 101, 201, 601);
subscriberTypes = Arrays.<Class<? extends MessageHandler>>asList(SimpleBrokerMessageHandler.class,
UserDestinationMessageHandler.class);
subscriberTypes = Arrays.asList(SimpleBrokerMessageHandler.class, UserDestinationMessageHandler.class);
testChannel("brokerChannel", subscriberTypes, 1);
testExecutor("brokerChannel", 102, 202, 602);
}
// SPR-11623
@Test
@Test // SPR-11623
public void customChannelsWithDefaultExecutor() {
loadBeanDefinitions("websocket-config-broker-customchannels-default-executor.xml");

15
spring-websocket/src/test/java/org/springframework/web/socket/messaging/StompWebSocketIntegrationTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -57,8 +57,8 @@ import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerCo @@ -57,8 +57,8 @@ import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerCo
import org.springframework.web.socket.handler.TextWebSocketHandler;
import org.springframework.web.socket.server.HandshakeHandler;
import static org.junit.Assert.assertTrue;
import static org.springframework.web.socket.messaging.StompTextMessageBuilder.create;
import static org.junit.Assert.*;
import static org.springframework.web.socket.messaging.StompTextMessageBuilder.*;
/**
* Integration tests with annotated message-handling methods.
@ -70,6 +70,7 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration @@ -70,6 +70,7 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration
private static final long TIMEOUT = 10;
@Parameters(name = "server [{0}], client [{1}]")
public static Object[][] arguments() {
return new Object[][] {
@ -266,7 +267,7 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration @@ -266,7 +267,7 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration
}
static interface ScopedBean {
interface ScopedBean {
String getValue();
}
@ -320,9 +321,9 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration @@ -320,9 +321,9 @@ public class StompWebSocketIntegrationTests extends AbstractWebSocketIntegration
@Configuration
@ComponentScan(
basePackageClasses=StompWebSocketIntegrationTests.class,
useDefaultFilters=false,
includeFilters=@ComponentScan.Filter(IntegrationTestController.class))
basePackageClasses = StompWebSocketIntegrationTests.class,
useDefaultFilters = false,
includeFilters = @ComponentScan.Filter(IntegrationTestController.class))
static class TestMessageBrokerConfigurer implements WebSocketMessageBrokerConfigurer {
@Autowired

Loading…
Cancel
Save