Browse Source

Support user destinations with multiple app servers

This change adds support for broadcasting messages with unresolved
user destinations so that other servers can try to resolve it.
That enables sending messages to users who may be connected to a
different server.

Issue: SPR-11620
pull/780/head
Rossen Stoyanchev 10 years ago
parent
commit
c29eae3307
  1. 15
      spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessageHeaderAccessor.java
  2. 24
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java
  3. 20
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java
  4. 101
      spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java
  5. 134
      spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationMessageHandler.java
  6. 14
      spring-messaging/src/main/java/org/springframework/messaging/support/NativeMessageHeaderAccessor.java
  7. 18
      spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java
  8. 230
      spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java
  9. 72
      spring-messaging/src/test/java/org/springframework/messaging/simp/user/UserDestinationMessageHandlerTests.java
  10. 44
      spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java
  11. 11
      spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.2.xsd
  12. 6
      spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java
  13. 2
      spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-relay.xml
  14. 7
      src/asciidoc/web-websocket.adoc

15
spring-messaging/src/main/java/org/springframework/messaging/simp/SimpMessageHeaderAccessor.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -69,14 +69,17 @@ public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor { @@ -69,14 +69,17 @@ public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor {
/**
* For internal use.
* <p>The original destination used by a client when subscribing. Such a
* destination may have been modified (e.g. user destinations) on the server
* side. This header provides a hint so messages sent to clients may have
* a destination matching to their original subscription.
* A header for internal use with "user" destinations where we need to
* restore the destination prior to sending messages to clients.
*/
public static final String ORIGINAL_DESTINATION = "simpOrigDestination";
/**
* A header that indicates to the broker that the sender will ignore errors.
* The header is simply checked for presence or absence.
*/
public static final String IGNORE_ERROR = "simpIgnoreError";
/**
* A constructor for creating new message headers.

24
spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java

@ -17,7 +17,10 @@ @@ -17,7 +17,10 @@
package org.springframework.messaging.simp.config;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.BeanInitializationException;
@ -25,6 +28,7 @@ import org.springframework.context.ApplicationContext; @@ -25,6 +28,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.converter.ByteArrayMessageConverter;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.DefaultContentTypeResolver;
@ -37,6 +41,7 @@ import org.springframework.messaging.simp.SimpMessagingTemplate; @@ -37,6 +41,7 @@ import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler;
import org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler;
import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
import org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler;
import org.springframework.messaging.simp.user.DefaultUserDestinationResolver;
import org.springframework.messaging.simp.user.DefaultUserSessionRegistry;
import org.springframework.messaging.simp.user.UserDestinationMessageHandler;
@ -278,13 +283,26 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC @@ -278,13 +283,26 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
@Bean
public AbstractBrokerMessageHandler stompBrokerRelayMessageHandler() {
AbstractBrokerMessageHandler handler = getBrokerRegistry().getStompBrokerRelay(brokerChannel());
return (handler != null ? handler : new NoOpBrokerMessageHandler());
StompBrokerRelayMessageHandler handler = getBrokerRegistry().getStompBrokerRelay(brokerChannel());
if (handler == null) {
return new NoOpBrokerMessageHandler();
}
String destination = getBrokerRegistry().getUserDestinationBroadcast();
if (destination != null) {
Map<String, MessageHandler> map = new HashMap<String, MessageHandler>(1);
map.put(destination, userDestinationMessageHandler());
handler.setSystemSubscriptions(map);
}
return handler;
}
@Bean
public UserDestinationMessageHandler userDestinationMessageHandler() {
return new UserDestinationMessageHandler(clientInboundChannel(), brokerChannel(), userDestinationResolver());
UserDestinationMessageHandler handler = new UserDestinationMessageHandler(clientInboundChannel(),
brokerChannel(), userDestinationResolver());
String destination = getBrokerRegistry().getUserDestinationBroadcast();
handler.setUserDestinationBroadcast(destination);
return handler;
}
@Bean

20
spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java

@ -49,6 +49,8 @@ public class MessageBrokerRegistry { @@ -49,6 +49,8 @@ public class MessageBrokerRegistry {
private String userDestinationPrefix;
private String userDestinationBroadcast;
private PathMatcher pathMatcher;
@ -137,6 +139,24 @@ public class MessageBrokerRegistry { @@ -137,6 +139,24 @@ public class MessageBrokerRegistry {
return this.userDestinationPrefix;
}
/**
* Set a destination to broadcast messages to that remain unresolved because
* the user is not connected. In a multi-application server scenario this
* gives other application servers a chance to try.
* <p><strong>Note:</strong> this option applies only when the
* {@link #enableStompBrokerRelay "broker relay"} is enabled.
* <p>By default this is not set.
* @param destination the destination to forward unresolved
* messages to, e.g. "/topic/unresolved-user-destination".
*/
public void setUserDestinationBroadcast(String destination) {
this.userDestinationBroadcast = destination;
}
protected String getUserDestinationBroadcast() {
return this.userDestinationBroadcast;
}
/**
* Configure the PathMatcher to use to match the destinations of incoming
* messages to {@code @MessageMapping} and {@code @SubscribeMapping} methods.

101
spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

@ -17,6 +17,7 @@ @@ -17,6 +17,7 @@
package org.springframework.messaging.simp.stomp;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@ -26,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger; @@ -26,6 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
@ -112,6 +114,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -112,6 +114,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private String virtualHost;
private final Map<String, MessageHandler> systemSubscriptions = new HashMap<String, MessageHandler>(4);
private TcpOperations<byte[]> tcpClient;
private MessageHeaderInitializer headerInitializer;
@ -281,6 +285,27 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -281,6 +285,27 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.systemPasscode;
}
/**
* Configure one more destinations to subscribe to on the shared "system"
* connection along with MessageHandler's to handle received messages.
* <p>This is for internal use in a multi-application server scenario where
* servers forward messages to each other (e.g. unresolved user destinations).
* @param subscriptions the destinations to subscribe to.
*/
public void setSystemSubscriptions(Map<String, MessageHandler> subscriptions) {
this.systemSubscriptions.clear();
if (subscriptions != null) {
this.systemSubscriptions.putAll(subscriptions);
}
}
/**
* Return the configured map with subscriptions on the "system" connection.
*/
public Map<String, MessageHandler> getSystemSubscriptions() {
return this.systemSubscriptions;
}
/**
* Set the value of the "host" header to use in STOMP CONNECT frames. When this
* property is configured, a "host" header will be added to every STOMP frame sent to
@ -532,6 +557,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -532,6 +557,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return this.sessionId;
}
protected TcpConnection<byte[]> getTcpConnection() {
return this.tcpConnection;
}
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
if (logger.isDebugEnabled()) {
@ -579,13 +608,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -579,13 +608,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
headerAccessor.setUser(this.connectHeaders.getUser());
headerAccessor.setMessage(errorText);
Message<?> errorMessage = MessageBuilder.createMessage(EMPTY_PAYLOAD, headerAccessor.getMessageHeaders());
headerAccessor.setImmutable();
sendMessageToClient(errorMessage);
handleInboundMessage(errorMessage);
}
}
protected void sendMessageToClient(Message<?> message) {
protected void handleInboundMessage(Message<?> message) {
if (this.isRemoteClientSession) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
accessor.setImmutable();
StompBrokerRelayMessageHandler.this.getClientOutboundChannel().send(message);
}
}
@ -610,8 +640,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -610,8 +640,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
logger.trace("Received " + accessor.getDetailedLogMessage(message.getPayload()));
}
accessor.setImmutable();
sendMessageToClient(message);
handleInboundMessage(message);
}
/**
@ -825,7 +854,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -825,7 +854,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
}
private class SystemStompConnectionHandler extends StompConnectionHandler {
public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
@ -839,6 +867,63 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -839,6 +867,63 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}
super.afterStompConnected(connectedHeaders);
publishBrokerAvailableEvent();
sendSystemSubscriptions();
}
private void sendSystemSubscriptions() {
int i = 0;
for (String destination : getSystemSubscriptions().keySet()) {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SUBSCRIBE);
accessor.setSubscriptionId(String.valueOf(i++));
accessor.setDestination(destination);
if (logger.isDebugEnabled()) {
logger.debug("Subscribing to " + destination + " on \"system\" connection.");
}
TcpConnection<byte[]> conn = getTcpConnection();
if (conn != null) {
MessageHeaders headers = accessor.getMessageHeaders();
conn.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, headers)).addCallback(
new ListenableFutureCallback<Void>() {
public void onSuccess(Void result) {
}
public void onFailure(Throwable ex) {
String error = "Failed to subscribe in \"system\" session.";
handleTcpConnectionFailure(error, ex);
}
});
}
}
}
@Override
protected void handleInboundMessage(Message<?> message) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.MESSAGE.equals(accessor.getCommand())) {
String destination = accessor.getDestination();
if (destination == null) {
if (logger.isDebugEnabled()) {
logger.debug("Got message on \"system\" connection, with no destination: " +
accessor.getDetailedLogMessage(message.getPayload()));
}
return;
}
if (!getSystemSubscriptions().containsKey(destination)) {
if (logger.isDebugEnabled()) {
logger.debug("Got message on \"system\" connection with no handler: " +
accessor.getDetailedLogMessage(message.getPayload()));
}
return;
}
try {
MessageHandler handler = getSystemSubscriptions().get(destination);
handler.handleMessage(message);
}
catch (Throwable ex) {
if (logger.isDebugEnabled()) {
logger.debug("Error while handling message on \"system\" connection.", ex);
}
}
}
}
@Override
@ -857,7 +942,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @@ -857,7 +942,9 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
public ListenableFuture<Void> forward(Message<?> message, StompHeaderAccessor accessor) {
try {
ListenableFuture<Void> future = super.forward(message, accessor);
future.get();
if (message.getHeaders().get(SimpMessageHeaderAccessor.IGNORE_ERROR) == null) {
future.get();
}
return future;
}
catch (Throwable ex) {

134
spring-messaging/src/main/java/org/springframework/messaging/simp/user/UserDestinationMessageHandler.java

@ -16,7 +16,11 @@ @@ -16,7 +16,11 @@
package org.springframework.messaging.simp.user;
import java.util.Set;
import static org.springframework.messaging.simp.SimpMessageHeaderAccessor.*;
import static org.springframework.messaging.support.MessageHeaderAccessor.getAccessor;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -24,6 +28,7 @@ import org.apache.commons.logging.LogFactory; @@ -24,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.context.SmartLifecycle;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.core.MessageSendingOperations;
@ -33,6 +38,7 @@ import org.springframework.messaging.simp.SimpMessagingTemplate; @@ -33,6 +38,7 @@ import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderInitializer;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* {@code MessageHandler} with support for "user" destinations.
@ -53,9 +59,11 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec @@ -53,9 +59,11 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
private final SubscribableChannel brokerChannel;
private final UserDestinationResolver destinationResolver;
private final MessageSendingOperations<String> messagingTemplate;
private final UserDestinationResolver destinationResolver;
private BroadcastHandler broadcastHandler;
private MessageHeaderInitializer headerInitializer;
@ -93,6 +101,25 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec @@ -93,6 +101,25 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
return this.destinationResolver;
}
/**
* Set a destination to broadcast messages to that remain unresolved because
* the user is not connected. In a multi-application server scenario this
* gives other application servers a chance to try.
* <p>By default this is not set.
* @param destination the target destination.
*/
public void setUserDestinationBroadcast(String destination) {
this.broadcastHandler = (StringUtils.hasText(destination) ?
new BroadcastHandler(this.messagingTemplate, destination) : null);
}
/**
* Return the configured destination for unresolved messages.
*/
public String getUserDestinationBroadcast() {
return (this.broadcastHandler != null ? this.broadcastHandler.getBroadcastDestination() : null);
}
/**
* Return the messaging template used to send resolved messages to the
* broker channel.
@ -164,29 +191,35 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec @@ -164,29 +191,35 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
@Override
public void handleMessage(Message<?> message) throws MessagingException {
if (this.broadcastHandler != null) {
message = this.broadcastHandler.preHandle(message);
if (message == null) {
return;
}
}
UserDestinationResult result = this.destinationResolver.resolveDestination(message);
if (result == null) {
return;
}
Set<String> destinations = result.getTargetDestinations();
if (destinations.isEmpty()) {
if (result.getTargetDestinations().isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("No user destinations found for " + result.getSourceDestination());
logger.trace("No active sessions for user destination: " + result.getSourceDestination());
}
if (this.broadcastHandler != null) {
this.broadcastHandler.handleUnresolved(message);
}
return;
}
if (SimpMessageType.MESSAGE.equals(SimpMessageHeaderAccessor.getMessageType(message.getHeaders()))) {
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
initHeaders(accessor);
String header = SimpMessageHeaderAccessor.ORIGINAL_DESTINATION;
accessor.setNativeHeader(header, result.getSubscribeDestination());
message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
}
if (logger.isDebugEnabled()) {
logger.debug("Translated " + result.getSourceDestination() + " -> " + destinations);
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
initHeaders(accessor);
accessor.setNativeHeader(ORIGINAL_DESTINATION, result.getSubscribeDestination());
accessor.setLeaveMutable(true);
message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
if (logger.isTraceEnabled()) {
logger.trace("Translated " + result.getSourceDestination() + " -> " + result.getTargetDestinations());
}
for (String destination : destinations) {
this.messagingTemplate.send(destination, message);
for (String target : result.getTargetDestinations()) {
this.messagingTemplate.send(target, message);
}
}
@ -201,4 +234,73 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec @@ -201,4 +234,73 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
return "UserDestinationMessageHandler[" + this.destinationResolver + "]";
}
/**
* A handler that broadcasts locally unresolved messages to the broker and
* also handles similar broadcasts received from the broker.
*/
private static class BroadcastHandler {
private static final List<String> NO_COPY_LIST = Arrays.asList("subscription", "message-id");
private final MessageSendingOperations<String> messagingTemplate;
private final String broadcastDestination;
public BroadcastHandler(MessageSendingOperations<String> template, String destination) {
this.messagingTemplate = template;
this.broadcastDestination = destination;
}
public String getBroadcastDestination() {
return this.broadcastDestination;
}
public Message<?> preHandle(Message<?> message) throws MessagingException {
String destination = SimpMessageHeaderAccessor.getDestination(message.getHeaders());
if (!getBroadcastDestination().equals(destination)) {
return message;
}
SimpMessageHeaderAccessor accessor = getAccessor(message, SimpMessageHeaderAccessor.class);
if (accessor.getSessionId() == null) {
// Our own broadcast
return null;
}
destination = accessor.getFirstNativeHeader(ORIGINAL_DESTINATION);
if (logger.isTraceEnabled()) {
logger.trace("Checking unresolved user destination: " + destination);
}
SimpMessageHeaderAccessor newAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
for (String name : accessor.toNativeHeaderMap().keySet()) {
if (NO_COPY_LIST.contains(name)) {
continue;
}
newAccessor.setNativeHeader(name, accessor.getFirstNativeHeader(name));
}
newAccessor.setDestination(destination);
newAccessor.setHeader(SimpMessageHeaderAccessor.IGNORE_ERROR, true); // ensure send doesn't block
return MessageBuilder.createMessage(message.getPayload(), newAccessor.getMessageHeaders());
}
public void handleUnresolved(Message<?> message) {
MessageHeaders headers = message.getHeaders();
if (SimpMessageHeaderAccessor.getFirstNativeHeader(ORIGINAL_DESTINATION, headers) != null) {
// Re-broadcast
return;
}
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
String destination = accessor.getDestination();
accessor.setNativeHeader(ORIGINAL_DESTINATION, destination);
accessor.setLeaveMutable(true);
message = MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
if (logger.isTraceEnabled()) {
logger.trace("Translated " + destination + " -> " + getBroadcastDestination());
}
this.messagingTemplate.send(getBroadcastDestination(), message);
}
}
}

14
spring-messaging/src/main/java/org/springframework/messaging/support/NativeMessageHeaderAccessor.java

@ -115,7 +115,7 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor { @@ -115,7 +115,7 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
*/
public boolean containsNativeHeader(String headerName) {
Map<String, List<String>> map = getNativeHeaders();
return (map != null ? map.containsKey(headerName) : false);
return (map != null && map.containsKey(headerName));
}
/**
@ -207,4 +207,16 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor { @@ -207,4 +207,16 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
return nativeHeaders.remove(name);
}
@SuppressWarnings("unchecked")
public static String getFirstNativeHeader(String headerName, Map<String, Object> headers) {
Map<String, List<String>> map = (Map<String, List<String>>) headers.get(NATIVE_HEADERS);
if (map != null) {
List<String> values = map.get(headerName);
if (values != null) {
return values.get(0);
}
}
return null;
}
}

18
spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -79,8 +79,11 @@ import static org.mockito.Mockito.*; @@ -79,8 +79,11 @@ import static org.mockito.Mockito.*;
public class MessageBrokerConfigurationTests {
private ApplicationContext defaultContext = new AnnotationConfigApplicationContext(DefaultConfig.class);
private ApplicationContext simpleBrokerContext = new AnnotationConfigApplicationContext(SimpleBrokerConfig.class);
private ApplicationContext brokerRelayContext = new AnnotationConfigApplicationContext(BrokerRelayConfig.class);
private ApplicationContext customContext = new AnnotationConfigApplicationContext(CustomConfig.class);
@ -401,7 +404,17 @@ public class MessageBrokerConfigurationTests { @@ -401,7 +404,17 @@ public class MessageBrokerConfigurationTests {
assertEquals("a.a", handler.getPathMatcher().combine("a", "a"));
}
@Test
public void userDestinationBroadcast() throws Exception {
StompBrokerRelayMessageHandler relay = this.brokerRelayContext.getBean(StompBrokerRelayMessageHandler.class);
UserDestinationMessageHandler userHandler = this.brokerRelayContext.getBean(UserDestinationMessageHandler.class);
assertEquals("/topic/unresolved", userHandler.getUserDestinationBroadcast());
assertNotNull(relay.getSystemSubscriptions());
assertSame(userHandler, relay.getSystemSubscriptions().get("/topic/unresolved"));
}
@SuppressWarnings("unused")
@Controller
static class TestController {
@ -417,7 +430,7 @@ public class MessageBrokerConfigurationTests { @@ -417,7 +430,7 @@ public class MessageBrokerConfigurationTests {
}
}
@SuppressWarnings("unused")
@Configuration
static class SimpleBrokerConfig extends AbstractMessageBrokerConfiguration {
@ -451,6 +464,7 @@ public class MessageBrokerConfigurationTests { @@ -451,6 +464,7 @@ public class MessageBrokerConfigurationTests {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue").setAutoStartup(true);
registry.setUserDestinationBroadcast("/topic/unresolved");
}
}

230
spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java

@ -15,15 +15,22 @@ @@ -15,15 +15,22 @@
*/
package org.springframework.messaging.simp.stomp;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.StubMessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
@ -37,8 +44,6 @@ import org.springframework.messaging.tcp.TcpOperations; @@ -37,8 +44,6 @@ import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
import static org.junit.Assert.*;
/**
* Unit tests for StompBrokerRelayMessageHandler.
*
@ -74,62 +79,52 @@ public class StompBrokerRelayMessageHandlerTests { @@ -74,62 +79,52 @@ public class StompBrokerRelayMessageHandlerTests {
@Test
public void testVirtualHostHeader() throws Exception {
public void virtualHost() throws Exception {
String virtualHost = "ABC";
this.brokerRelay.setVirtualHost(virtualHost);
this.brokerRelay.start();
this.brokerRelay.setVirtualHost("ABC");
String sessionId = "sess1";
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
this.brokerRelay.handleMessage(MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()));
this.brokerRelay.start();
this.brokerRelay.handleMessage(connectMessage("sess1", "joe"));
List<Message<byte[]>> sent = this.tcpClient.connection.messages;
assertEquals(2, sent.size());
assertEquals(2, this.tcpClient.getSentMessages().size());
StompHeaderAccessor headers1 = StompHeaderAccessor.wrap(sent.get(0));
assertEquals(virtualHost, headers1.getHost());
assertNotNull("The prepared message does not have an accessor",
MessageHeaderAccessor.getAccessor(sent.get(0), MessageHeaderAccessor.class));
StompHeaderAccessor headers1 = this.tcpClient.getSentHeaders(0);
assertEquals(StompCommand.CONNECT, headers1.getCommand());
assertEquals(StompBrokerRelayMessageHandler.SYSTEM_SESSION_ID, headers1.getSessionId());
assertEquals("ABC", headers1.getHost());
StompHeaderAccessor headers2 = StompHeaderAccessor.wrap(sent.get(1));
assertEquals(sessionId, headers2.getSessionId());
assertEquals(virtualHost, headers2.getHost());
assertNotNull("The prepared message does not have an accessor",
MessageHeaderAccessor.getAccessor(sent.get(1), MessageHeaderAccessor.class));
StompHeaderAccessor headers2 = this.tcpClient.getSentHeaders(1);
assertEquals(StompCommand.CONNECT, headers2.getCommand());
assertEquals("sess1", headers2.getSessionId());
assertEquals("ABC", headers2.getHost());
}
@Test
public void testLoginPasscode() throws Exception {
this.brokerRelay.setClientLogin("clientlogin");
this.brokerRelay.setClientPasscode("clientpasscode");
public void loginAndPasscode() throws Exception {
this.brokerRelay.setSystemLogin("syslogin");
this.brokerRelay.setSystemPasscode("syspasscode");
this.brokerRelay.setClientLogin("clientlogin");
this.brokerRelay.setClientPasscode("clientpasscode");
this.brokerRelay.start();
this.brokerRelay.handleMessage(connectMessage("sess1", "joe"));
String sessionId = "sess1";
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
this.brokerRelay.handleMessage(MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()));
List<Message<byte[]>> sent = this.tcpClient.connection.messages;
assertEquals(2, sent.size());
assertEquals(2, this.tcpClient.getSentMessages().size());
StompHeaderAccessor headers1 = StompHeaderAccessor.wrap(sent.get(0));
StompHeaderAccessor headers1 = this.tcpClient.getSentHeaders(0);
assertEquals(StompCommand.CONNECT, headers1.getCommand());
assertEquals("syslogin", headers1.getLogin());
assertEquals("syspasscode", headers1.getPasscode());
StompHeaderAccessor headers2 = StompHeaderAccessor.wrap(sent.get(1));
StompHeaderAccessor headers2 = this.tcpClient.getSentHeaders(1);
assertEquals(StompCommand.CONNECT, headers2.getCommand());
assertEquals("clientlogin", headers2.getLogin());
assertEquals("clientpasscode", headers2.getPasscode());
}
@Test
public void testDestinationExcluded() throws Exception {
public void destinationExcluded() throws Exception {
this.brokerRelay.start();
@ -138,89 +133,113 @@ public class StompBrokerRelayMessageHandlerTests { @@ -138,89 +133,113 @@ public class StompBrokerRelayMessageHandlerTests {
headers.setDestination("/user/daisy/foo");
this.brokerRelay.handleMessage(MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()));
List<Message<byte[]>> sent = this.tcpClient.connection.messages;
assertEquals(1, sent.size());
assertEquals(StompCommand.CONNECT, StompHeaderAccessor.wrap(sent.get(0)).getCommand());
assertNotNull("The prepared message does not have an accessor",
MessageHeaderAccessor.getAccessor(sent.get(0), MessageHeaderAccessor.class));
assertEquals(1, this.tcpClient.getSentMessages().size());
StompHeaderAccessor headers1 = this.tcpClient.getSentHeaders(0);
assertEquals(StompCommand.CONNECT, headers1.getCommand());
assertEquals(StompBrokerRelayMessageHandler.SYSTEM_SESSION_ID, headers1.getSessionId());
}
@Test
public void testOutboundMessageIsEnriched() throws Exception {
public void messageFromBrokerIsEnriched() throws Exception {
this.brokerRelay.start();
this.brokerRelay.handleMessage(connectMessage("sess1", "joe"));
String sessionId = "sess1";
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
headers.setUser(new TestPrincipal("joe"));
this.brokerRelay.handleMessage(MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()));
List<Message<byte[]>> sent = this.tcpClient.connection.messages;
assertEquals(2, sent.size());
assertEquals(2, this.tcpClient.getSentMessages().size());
assertEquals(StompCommand.CONNECT, this.tcpClient.getSentHeaders(0).getCommand());
assertEquals(StompCommand.CONNECT, this.tcpClient.getSentHeaders(1).getCommand());
StompHeaderAccessor responseHeaders = StompHeaderAccessor.create(StompCommand.MESSAGE);
responseHeaders.setLeaveMutable(true);
Message<byte[]> response = MessageBuilder.createMessage(new byte[0], responseHeaders.getMessageHeaders());
this.tcpClient.connectionHandler.handleMessage(response);
this.tcpClient.handleMessage(message(StompCommand.MESSAGE, null, null, null));
Message<byte[]> actual = this.outboundChannel.getMessages().get(0);
StompHeaderAccessor actualHeaders = StompHeaderAccessor.getAccessor(actual, StompHeaderAccessor.class);
assertEquals(sessionId, actualHeaders.getSessionId());
assertEquals("joe", actualHeaders.getUser().getName());
Message<byte[]> message = this.outboundChannel.getMessages().get(0);
StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals("sess1", accessor.getSessionId());
assertEquals("joe", accessor.getUser().getName());
}
// SPR-12820
@Test
public void testConnectWhenBrokerNotAvailable() throws Exception {
public void connectWhenBrokerNotAvailable() throws Exception {
this.brokerRelay.start();
this.brokerRelay.stopInternal();
this.brokerRelay.handleMessage(connectMessage("sess1", "joe"));
Message<byte[]> message = this.outboundChannel.getMessages().get(0);
StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.ERROR, accessor.getCommand());
assertEquals("sess1", accessor.getSessionId());
assertEquals("joe", accessor.getUser().getName());
assertEquals("Broker not available.", accessor.getMessage());
}
String sessionId = "sess1";
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
headers.setUser(new TestPrincipal("joe"));
this.brokerRelay.handleMessage(MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()));
@Test
public void sendAfterBrokerUnavailable() throws Exception {
this.brokerRelay.start();
assertEquals(1, this.brokerRelay.getConnectionCount());
Message<byte[]> actual = this.outboundChannel.getMessages().get(0);
StompHeaderAccessor actualHeaders = StompHeaderAccessor.getAccessor(actual, StompHeaderAccessor.class);
assertEquals(StompCommand.ERROR, actualHeaders.getCommand());
assertEquals(sessionId, actualHeaders.getSessionId());
assertEquals("joe", actualHeaders.getUser().getName());
assertEquals("Broker not available.", actualHeaders.getMessage());
this.brokerRelay.handleMessage(connectMessage("sess1", "joe"));
assertEquals(2, this.brokerRelay.getConnectionCount());
this.brokerRelay.stopInternal();
this.brokerRelay.handleMessage(message(StompCommand.SEND, "sess1", "joe", "/foo"));
assertEquals(1, this.brokerRelay.getConnectionCount());
Message<byte[]> message = this.outboundChannel.getMessages().get(0);
StompHeaderAccessor accessor = StompHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertEquals(StompCommand.ERROR, accessor.getCommand());
assertEquals("sess1", accessor.getSessionId());
assertEquals("joe", accessor.getUser().getName());
assertEquals("Broker not available.", accessor.getMessage());
}
@Test
public void testSendAfterBrokerUnavailable() throws Exception {
public void systemSubscription() throws Exception {
MessageHandler handler = mock(MessageHandler.class);
this.brokerRelay.setSystemSubscriptions(Collections.singletonMap("/topic/foo", handler));
this.brokerRelay.start();
String sessionId = "sess1";
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
headers.setUser(new TestPrincipal("joe"));
this.brokerRelay.handleMessage(MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()));
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECTED);
accessor.setLeaveMutable(true);
MessageHeaders headers = accessor.getMessageHeaders();
this.tcpClient.handleMessage(MessageBuilder.createMessage(new byte[0], headers));
assertEquals(2, this.brokerRelay.getConnectionCount());
assertEquals(2, this.tcpClient.getSentMessages().size());
assertEquals(StompCommand.CONNECT, this.tcpClient.getSentHeaders(0).getCommand());
assertEquals(StompCommand.SUBSCRIBE, this.tcpClient.getSentHeaders(1).getCommand());
assertEquals("/topic/foo", this.tcpClient.getSentHeaders(1).getDestination());
this.brokerRelay.stopInternal();
Message<byte[]> message = message(StompCommand.MESSAGE, null, null, "/topic/foo");
this.tcpClient.handleMessage(message);
headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setSessionId(sessionId);
headers.setUser(new TestPrincipal("joe"));
headers.setDestination("/foo");
this.brokerRelay.handleMessage(MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
verify(handler).handleMessage(captor.capture());
assertSame(message, captor.getValue());
}
assertEquals(1, this.brokerRelay.getConnectionCount());
private Message<byte[]> connectMessage(String sessionId, String user) {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setSessionId(sessionId);
headers.setUser(new TestPrincipal(user));
return MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
}
Message<byte[]> actual = this.outboundChannel.getMessages().get(0);
StompHeaderAccessor actualHeaders = StompHeaderAccessor.getAccessor(actual, StompHeaderAccessor.class);
assertEquals(StompCommand.ERROR, actualHeaders.getCommand());
assertEquals(sessionId, actualHeaders.getSessionId());
assertEquals("joe", actualHeaders.getUser().getName());
assertEquals("Broker not available.", actualHeaders.getMessage());
private Message<byte[]> message(StompCommand command, String sessionId, String user, String destination) {
StompHeaderAccessor accessor = StompHeaderAccessor.create(command);
if (sessionId != null) {
accessor.setSessionId(sessionId);
}
if (user != null) {
accessor.setUser(new TestPrincipal(user));
}
if (destination != null) {
accessor.setDestination(destination);
}
accessor.setLeaveMutable(true);
return MessageBuilder.createMessage(new byte[0], accessor.getMessageHeaders());
}
@ -254,17 +273,29 @@ public class StompBrokerRelayMessageHandlerTests { @@ -254,17 +273,29 @@ public class StompBrokerRelayMessageHandlerTests {
private TcpConnectionHandler<byte[]> connectionHandler;
public List<Message<byte[]>> getSentMessages() {
return this.connection.getMessages();
}
public StompHeaderAccessor getSentHeaders(int index) {
assertTrue("Size: " + getSentMessages().size(), getSentMessages().size() > index);
Message<byte[]> message = getSentMessages().get(index);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
assertNotNull(accessor);
return accessor;
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> connectionHandler) {
this.connectionHandler = connectionHandler;
connectionHandler.afterConnected(this.connection);
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler) {
this.connectionHandler = handler;
handler.afterConnected(this.connection);
return getVoidFuture();
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> connectionHandler, ReconnectStrategy reconnectStrategy) {
this.connectionHandler = connectionHandler;
connectionHandler.afterConnected(this.connection);
public ListenableFuture<Void> connect(TcpConnectionHandler<byte[]> handler, ReconnectStrategy strategy) {
this.connectionHandler = handler;
handler.afterConnected(this.connection);
return getVoidFuture();
}
@ -272,6 +303,11 @@ public class StompBrokerRelayMessageHandlerTests { @@ -272,6 +303,11 @@ public class StompBrokerRelayMessageHandlerTests {
public ListenableFuture<Boolean> shutdown() {
return getBooleanFuture();
}
public void handleMessage(Message<byte[]> message) {
this.connectionHandler.handleMessage(message);
}
}
@ -280,6 +316,10 @@ public class StompBrokerRelayMessageHandlerTests { @@ -280,6 +316,10 @@ public class StompBrokerRelayMessageHandlerTests {
private final List<Message<byte[]>> messages = new ArrayList<>();
public List<Message<byte[]>> getMessages() {
return this.messages;
}
@Override
public ListenableFuture<Void> send(Message<byte[]> message) {
this.messages.add(message);

72
spring-messaging/src/test/java/org/springframework/messaging/simp/user/UserDestinationMessageHandlerTests.java

@ -18,7 +18,9 @@ package org.springframework.messaging.simp.user; @@ -18,7 +18,9 @@ package org.springframework.messaging.simp.user;
import static org.junit.Assert.*;
import static org.mockito.BDDMockito.*;
import static org.springframework.messaging.simp.SimpMessageHeaderAccessor.ORIGINAL_DESTINATION;
import static org.springframework.messaging.simp.SimpMessageHeaderAccessor.*;
import java.nio.charset.Charset;
import org.junit.Before;
import org.junit.Test;
@ -33,6 +35,8 @@ import org.springframework.messaging.SubscribableChannel; @@ -33,6 +35,8 @@ import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.TestPrincipal;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.MessageBuilder;
/**
@ -62,7 +66,6 @@ public class UserDestinationMessageHandlerTests { @@ -62,7 +66,6 @@ public class UserDestinationMessageHandlerTests {
@Test
@SuppressWarnings("rawtypes")
public void handleSubscribe() {
given(this.brokerChannel.send(Mockito.any(Message.class))).willReturn(true);
this.handler.handleMessage(createWith(SimpMessageType.SUBSCRIBE, "joe", SESSION_ID, "/user/queue/foo"));
@ -75,7 +78,6 @@ public class UserDestinationMessageHandlerTests { @@ -75,7 +78,6 @@ public class UserDestinationMessageHandlerTests {
}
@Test
@SuppressWarnings("rawtypes")
public void handleUnsubscribe() {
given(this.brokerChannel.send(Mockito.any(Message.class))).willReturn(true);
this.handler.handleMessage(createWith(SimpMessageType.UNSUBSCRIBE, "joe", "123", "/user/queue/foo"));
@ -88,7 +90,6 @@ public class UserDestinationMessageHandlerTests { @@ -88,7 +90,6 @@ public class UserDestinationMessageHandlerTests {
}
@Test
@SuppressWarnings("rawtypes")
public void handleMessage() {
this.registry.registerSessionId("joe", "123");
given(this.brokerChannel.send(Mockito.any(Message.class))).willReturn(true);
@ -102,6 +103,69 @@ public class UserDestinationMessageHandlerTests { @@ -102,6 +103,69 @@ public class UserDestinationMessageHandlerTests {
assertEquals("/user/queue/foo", accessor.getFirstNativeHeader(ORIGINAL_DESTINATION));
}
@Test
public void handleMessageWithoutActiveSession() {
this.handler.setUserDestinationBroadcast("/topic/unresolved");
given(this.brokerChannel.send(Mockito.any(Message.class))).willReturn(true);
this.handler.handleMessage(createWith(SimpMessageType.MESSAGE, "joe", "123", "/user/joe/queue/foo"));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
Mockito.verify(this.brokerChannel).send(captor.capture());
Message message = captor.getValue();
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.wrap(message);
assertEquals("/topic/unresolved", accessor.getDestination());
assertEquals("/user/joe/queue/foo", accessor.getFirstNativeHeader(ORIGINAL_DESTINATION));
// Should ignore our own broadcast to brokerChannel
this.handler.handleMessage(message);
Mockito.verifyNoMoreInteractions(this.brokerChannel);
}
@Test
public void handleMessageFromBrokerWithActiveSession() {
this.registry.registerSessionId("joe", "123");
this.handler.setUserDestinationBroadcast("/topic/unresolved");
given(this.brokerChannel.send(Mockito.any(Message.class))).willReturn(true);
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.MESSAGE);
accessor.setSessionId("system123");
accessor.setDestination("/topic/unresolved");
accessor.setNativeHeader(ORIGINAL_DESTINATION, "/user/joe/queue/foo");
accessor.setNativeHeader("customHeader", "customHeaderValue");
accessor.setLeaveMutable(true);
byte[] payload = "payload".getBytes(Charset.forName("UTF-8"));
this.handler.handleMessage(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));
ArgumentCaptor<Message> captor = ArgumentCaptor.forClass(Message.class);
Mockito.verify(this.brokerChannel).send(captor.capture());
assertNotNull(captor.getValue());
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(captor.getValue());
assertEquals("/queue/foo-user123", headers.getDestination());
assertEquals("/user/queue/foo", headers.getFirstNativeHeader(ORIGINAL_DESTINATION));
assertEquals("customHeaderValue", headers.getFirstNativeHeader("customHeader"));
assertArrayEquals(payload, (byte[]) captor.getValue().getPayload());
}
@Test
public void handleMessageFromBrokerWithoutActiveSession() {
this.handler.setUserDestinationBroadcast("/topic/unresolved");
given(this.brokerChannel.send(Mockito.any(Message.class))).willReturn(true);
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.MESSAGE);
accessor.setSessionId("system123");
accessor.setDestination("/topic/unresolved");
accessor.setNativeHeader(ORIGINAL_DESTINATION, "/user/joe/queue/foo");
accessor.setLeaveMutable(true);
byte[] payload = "payload".getBytes(Charset.forName("UTF-8"));
this.handler.handleMessage(MessageBuilder.createMessage(payload, accessor.getMessageHeaders()));
// No re-broadcast
verifyNoMoreInteractions(this.brokerChannel);
}
@Test
public void ignoreMessage() {

44
spring-websocket/src/main/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParser.java

@ -159,15 +159,18 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { @@ -159,15 +159,18 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
channelElem = DomUtils.getChildElementByTagName(element, "broker-channel");
RuntimeBeanReference brokerChannel = getMessageChannel("brokerChannel", channelElem, context, source);
RootBeanDefinition broker = registerMessageBroker(element, inChannel, outChannel, brokerChannel, context, source);
RuntimeBeanReference resolver = registerUserDestResolver(element, sessionRegistry, context, source);
RuntimeBeanReference userDestHandler = registerUserDestHandler(element, inChannel,
brokerChannel, resolver, context, source);
RootBeanDefinition broker = registerMessageBroker(element, userDestHandler, inChannel,
outChannel, brokerChannel, context, source);
RuntimeBeanReference converter = registerMessageConverter(element, context, source);
RuntimeBeanReference template = registerMessagingTemplate(element, brokerChannel, converter, context, source);
registerAnnotationMethodMessageHandler(element, inChannel, outChannel,converter, template, context, source);
RuntimeBeanReference resolver = registerUserDestinationResolver(element, sessionRegistry, context, source);
registerUserDestinationMessageHandler(inChannel, brokerChannel, resolver, context, source);
Map<String, Object> scopeMap = Collections.<String, Object>singletonMap("websocket", new SimpSessionScope());
RootBeanDefinition scopeConfigurer = new RootBeanDefinition(CustomScopeConfigurer.class);
scopeConfigurer.getPropertyValues().add("scopes", scopeMap);
@ -308,11 +311,13 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { @@ -308,11 +311,13 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
return new RuntimeBeanReference(registerBeanDef(beanDef, context, source));
}
private RootBeanDefinition registerMessageBroker(Element messageBrokerElement, RuntimeBeanReference inChannel,
RuntimeBeanReference outChannel, RuntimeBeanReference brokerChannel, ParserContext context, Object source) {
private RootBeanDefinition registerMessageBroker(Element brokerElement,
RuntimeBeanReference userDestHandler, RuntimeBeanReference inChannel,
RuntimeBeanReference outChannel, RuntimeBeanReference brokerChannel,
ParserContext context, Object source) {
Element simpleBrokerElem = DomUtils.getChildElementByTagName(messageBrokerElement, "simple-broker");
Element brokerRelayElem = DomUtils.getChildElementByTagName(messageBrokerElement, "stomp-broker-relay");
Element simpleBrokerElem = DomUtils.getChildElementByTagName(brokerElement, "simple-broker");
Element brokerRelayElem = DomUtils.getChildElementByTagName(brokerElement, "stomp-broker-relay");
ConstructorArgumentValues cavs = new ConstructorArgumentValues();
cavs.addIndexedArgumentValue(0, inChannel);
@ -324,8 +329,8 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { @@ -324,8 +329,8 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
String prefix = simpleBrokerElem.getAttribute("prefix");
cavs.addIndexedArgumentValue(3, Arrays.asList(StringUtils.tokenizeToStringArray(prefix, ",")));
brokerDef = new RootBeanDefinition(SimpleBrokerMessageHandler.class, cavs, null);
if (messageBrokerElement.hasAttribute("path-matcher")) {
String pathMatcherRef = messageBrokerElement.getAttribute("path-matcher");
if (brokerElement.hasAttribute("path-matcher")) {
String pathMatcherRef = brokerElement.getAttribute("path-matcher");
brokerDef.getPropertyValues().add("pathMatcher", new RuntimeBeanReference(pathMatcherRef));
}
if (simpleBrokerElem.hasAttribute("scheduler")) {
@ -369,6 +374,13 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { @@ -369,6 +374,13 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
if (brokerRelayElem.hasAttribute("virtual-host")) {
values.add("virtualHost", brokerRelayElem.getAttribute("virtual-host"));
}
if (brokerElement.hasAttribute("user-destination-broadcast")) {
String destination = brokerElement.getAttribute("user-destination-broadcast");
ManagedMap<String, Object> map = new ManagedMap<String, Object>();
map.setSource(source);
map.put(destination, userDestHandler);
values.add("systemSubscriptions", map);
}
Class<?> handlerType = StompBrokerRelayMessageHandler.class;
brokerDef = new RootBeanDefinition(handlerType, cavs, values);
}
@ -471,7 +483,7 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { @@ -471,7 +483,7 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
return list;
}
private RuntimeBeanReference registerUserDestinationResolver(Element brokerElem,
private RuntimeBeanReference registerUserDestResolver(Element brokerElem,
RuntimeBeanReference userSessionRegistry, ParserContext context, Object source) {
ConstructorArgumentValues cavs = new ConstructorArgumentValues();
@ -483,15 +495,19 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser { @@ -483,15 +495,19 @@ class MessageBrokerBeanDefinitionParser implements BeanDefinitionParser {
return new RuntimeBeanReference(registerBeanDef(beanDef, context, source));
}
private RuntimeBeanReference registerUserDestinationMessageHandler(RuntimeBeanReference inChannel,
RuntimeBeanReference brokerChannel, RuntimeBeanReference userDestinationResolver,
ParserContext context, Object source) {
private RuntimeBeanReference registerUserDestHandler(Element brokerElem,
RuntimeBeanReference inChannel, RuntimeBeanReference brokerChannel,
RuntimeBeanReference userDestinationResolver, ParserContext context, Object source) {
ConstructorArgumentValues cavs = new ConstructorArgumentValues();
cavs.addIndexedArgumentValue(0, inChannel);
cavs.addIndexedArgumentValue(1, brokerChannel);
cavs.addIndexedArgumentValue(2, userDestinationResolver);
RootBeanDefinition beanDef = new RootBeanDefinition(UserDestinationMessageHandler.class, cavs, null);
if (brokerElem.hasAttribute("user-destination-broadcast")) {
String destination = brokerElem.getAttribute("user-destination-broadcast");
beanDef.getPropertyValues().add("userDestinationBroadcast", destination);
}
return new RuntimeBeanReference(registerBeanDef(beanDef, context, source));
}

11
spring-websocket/src/main/resources/org/springframework/web/socket/config/spring-websocket-4.2.xsd

@ -853,6 +853,17 @@ @@ -853,6 +853,17 @@
The prefix used to identify user destinations.
Any destinations that do not start with the given prefix are not be resolved.
The default value is "/user/".
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="user-destination-broadcast" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
Set a destination to broadcast messages to that remain unresolved because
the user is not connected. In a multi-application server scenario this
gives other application servers a chance to try.
Note: this option applies only when the stomp-broker-relay is enabled.
By default this is not set.
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>

6
spring-websocket/src/test/java/org/springframework/web/socket/config/MessageBrokerBeanDefinitionParserTests.java

@ -275,6 +275,12 @@ public class MessageBrokerBeanDefinitionParserTests { @@ -275,6 +275,12 @@ public class MessageBrokerBeanDefinitionParserTests {
// expected
}
UserDestinationMessageHandler userDestHandler = this.appContext.getBean(UserDestinationMessageHandler.class);
assertEquals("/topic/unresolved", userDestHandler.getUserDestinationBroadcast());
assertNotNull(messageBroker.getSystemSubscriptions());
assertSame(userDestHandler, messageBroker.getSystemSubscriptions().get("/topic/unresolved"));
String name = "webSocketMessageBrokerStats";
WebSocketMessageBrokerStats stats = this.appContext.getBean(name, WebSocketMessageBrokerStats.class);
String actual = stats.toString();

2
spring-websocket/src/test/resources/org/springframework/web/socket/config/websocket-config-broker-relay.xml

@ -4,7 +4,7 @@ @@ -4,7 +4,7 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker order="2">
<websocket:message-broker order="2" user-destination-broadcast="/topic/unresolved">
<websocket:stomp-endpoint path="/foo">
<websocket:sockjs/>
</websocket:stomp-endpoint>

7
src/asciidoc/web-websocket.adoc

@ -1741,6 +1741,13 @@ http://activemq.apache.org/delete-inactive-destinations.html[configuration optio @@ -1741,6 +1741,13 @@ http://activemq.apache.org/delete-inactive-destinations.html[configuration optio
for purging inactive destinations.
====
In a multi-application server scenario a user destination may remain unresolved because
the user is connected to a different server. In such cases you can configure a
destination to broadcast unresolved messages to so that other servers have a chance to try.
This can be done through the `userDestinationBroadcast` property of the
`MessageBrokerRegistry` in Java config and the `user-destination-broadcast` attribute
of the `message-broker` element in XML.

Loading…
Cancel
Save