diff --git a/spring-messaging/src/main/java/org/springframework/messaging/Message.java b/spring-messaging/src/main/java/org/springframework/messaging/Message.java index 2728f284f2..42f3aa9374 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/Message.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/Message.java @@ -22,17 +22,18 @@ package org.springframework.messaging; * @author Mark Fisher * @author Arjen Poutsma * @since 4.0 + * * @see org.springframework.messaging.support.MessageBuilder */ public interface Message { /** - * Returns message headers for the message (never {@code null}). + * Return message headers for the message, never {@code null}. */ MessageHeaders getHeaders(); /** - * Returns the message payload. + * Return the message payload. */ T getPayload(); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/MessageChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/MessageChannel.java index 5321a3beba..ffbe6699f9 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/MessageChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/MessageChannel.java @@ -17,7 +17,7 @@ package org.springframework.messaging; /** - * Base channel interface defining common behavior for sending messages. + * Defines methods for sending messages. * * @author Mark Fisher * @since 4.0 @@ -31,24 +31,27 @@ public interface MessageChannel { /** - * Send a {@link Message} to this channel. May throw a RuntimeException for - * non-recoverable errors. Otherwise, if the Message cannot be sent for a non-fatal - * reason this method will return 'false', and if the Message is sent successfully, it - * will return 'true'. - *

Depending on the implementation, this method may block indefinitely. To provide a - * maximum wait time, use {@link #send(Message, long)}. - * @param message the {@link Message} to send - * @return whether or not the Message has been sent successfully + * Send a {@link Message} to this channel. If the message is sent successfully, + * the method returns {@code true}. If the message cannot be sent due to a + * non-fatal reason, the method returns {@code false}. The method may also + * throw a RuntimeException in case of non-recoverable errors. + *

+ * This method may block indefinitely, depending on the implementation. + * To provide a maximum wait time, use {@link #send(Message, long)}. + * + * @param message the message to send + * @return whether or not the message was sent */ boolean send(Message message); /** - * Send a message, blocking until either the message is accepted or the specified - * timeout period elapses. - * @param message the {@link Message} to send - * @param timeout the timeout in milliseconds or #INDEFINITE_TIMEOUT - * @return {@code true} if the message is sent successfully, {@code false} if the - * specified timeout period elapses or the send is interrupted + * Send a message, blocking until either the message is accepted or the + * specified timeout period elapses. + * + * @param message the message to send + * @param timeout the timeout in milliseconds or {@link #INDEFINITE_TIMEOUT} + * @return {@code true} if the message is sent, {@code false} if not including + * a timeout of an interrupt of the send */ boolean send(Message message, long timeout); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/MessageDeliveryException.java b/spring-messaging/src/main/java/org/springframework/messaging/MessageDeliveryException.java index e208fabe6e..716ce1aaee 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/MessageDeliveryException.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/MessageDeliveryException.java @@ -25,6 +25,7 @@ package org.springframework.messaging; @SuppressWarnings("serial") public class MessageDeliveryException extends MessagingException { + public MessageDeliveryException(String description) { super(description); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/MessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/MessageHandler.java index 39b88b860f..23aa0ba177 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/MessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/MessageHandler.java @@ -17,7 +17,7 @@ package org.springframework.messaging; /** - * Base interface for any component that handles Messages. + * Contract for handling a {@link Message}. * * @author Mark Fisher * @author Iwein Fuld @@ -26,17 +26,9 @@ package org.springframework.messaging; public interface MessageHandler { /** - * Handles the message if possible. If the handler cannot deal with the - * message this will result in a {@code MessageRejectedException} e.g. - * in case of a Selective Consumer. When a consumer tries to handle a - * message, but fails to do so, a {@code MessageHandlingException} is - * thrown. In the last case it is recommended to treat the message as tainted - * and go into an error scenario. - *

When the handling results in a failure of another message being sent - * (e.g. a "reply" message), that failure will trigger a - * {@code MessageDeliveryException}. + * Handle the given message. + * * @param message the message to be handled - * reply related to the handling of the message */ void handleMessage(Message message) throws MessagingException; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/MessageHandlingException.java b/spring-messaging/src/main/java/org/springframework/messaging/MessageHandlingException.java similarity index 86% rename from spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/MessageHandlingException.java rename to spring-messaging/src/main/java/org/springframework/messaging/MessageHandlingException.java index 8f41a79fc8..48529b1392 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/MessageHandlingException.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/MessageHandlingException.java @@ -14,15 +14,15 @@ * limitations under the License. */ -package org.springframework.messaging.handler.annotation.support; +package org.springframework.messaging; import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; /** - * Thrown when the handling of a message results in an unrecoverable exception. + * Exception that indicates an error occurred during message handling. * - * @author Rossen Stoyanchev + * @author Mark Fisher * @since 4.0 */ public class MessageHandlingException extends MessagingException { @@ -30,12 +30,12 @@ public class MessageHandlingException extends MessagingException { private static final long serialVersionUID = 690969923668400297L; - public MessageHandlingException(Message message, String description, Throwable cause) { - super(message, description, cause); - } - public MessageHandlingException(Message message, String description) { super(message, description); } + public MessageHandlingException(Message message, String description, Throwable cause) { + super(message, description, cause); + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/MessageHeaders.java b/spring-messaging/src/main/java/org/springframework/messaging/MessageHeaders.java index c5f3502f68..e27dafc3f4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/MessageHeaders.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/MessageHeaders.java @@ -38,29 +38,34 @@ import org.springframework.util.IdGenerator; /** * The headers for a {@link Message} - * - *

IMPORTANT: This class is immutable. Any mutating operation - * (e.g., put(..), putAll(..) etc.) will throw {@link UnsupportedOperationException}. - * - *

To create MessageHeaders instance use fluent - * {@link org.springframework.messaging.support.MessageBuilder MessageBuilder} API + *

+ * IMPORTANT: This class is immutable. Any mutating operation such as + * {@code put(..)}, {@code putAll(..)} and others will throw + * {@link UnsupportedOperationException}. + *

+ * One way to create message headers is to use the + * {@link org.springframework.messaging.support.MessageBuilder MessageBuilder}: *

  * MessageBuilder.withPayload("foo").setHeader("key1", "value1").setHeader("key2", "value2");
  * 
- * or create an instance of GenericMessage passing payload as {@link Object} and headers as a regular {@link Map} + * A second option is to create {@link org.springframework.messaging.support.GenericMessage} + * passing a payload as {@link Object} and headers as a {@link Map java.util.Map}: *
  * Map headers = new HashMap();
  * headers.put("key1", "value1");
  * headers.put("key2", "value2");
  * new GenericMessage("foo", headers);
  * 
+ * A third option is to use {@link org.springframework.messaging.support.MessageHeaderAccessor} + * or one of its sub-classes to create specific categories of headers. * * @author Arjen Poutsma * @author Mark Fisher * @author Gary Russell - * @author Rossen Stoyanchev * @since 4.0 + * * @see org.springframework.messaging.support.MessageBuilder + * @see org.springframework.messaging.support.MessageHeaderAccessor */ public final class MessageHeaders implements Map, Serializable { @@ -124,8 +129,8 @@ public final class MessageHeaders implements Map, Serializable { return null; } if (!type.isAssignableFrom(value.getClass())) { - throw new IllegalArgumentException("Incorrect type specified for header '" + key + "'. Expected [" + type - + "] but actual type is [" + value.getClass() + "]"); + throw new IllegalArgumentException("Incorrect type specified for header '" + + key + "'. Expected [" + type + "] but actual type is [" + value.getClass() + "]"); } return (T) value; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/PollableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/PollableChannel.java index 5254fb4ff6..c589382624 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/PollableChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/PollableChannel.java @@ -17,8 +17,7 @@ package org.springframework.messaging; /** - * Interface for Message Channels from which Messages may be actively received through - * polling. + * A {@link MessageChannel} from which messages may be actively received through polling. * * @author Mark Fisher * @since 4.0 @@ -27,6 +26,7 @@ public interface PollableChannel extends MessageChannel { /** * Receive a message from this channel, blocking indefinitely if necessary. + * * @return the next available {@link Message} or {@code null} if interrupted */ Message receive(); @@ -34,10 +34,10 @@ public interface PollableChannel extends MessageChannel { /** * Receive a message from this channel, blocking until either a message is available * or the specified timeout period elapses. - * @param timeout the timeout in milliseconds or - * {@link MessageChannel#INDEFINITE_TIMEOUT}. + * + * @param timeout the timeout in milliseconds or {@link MessageChannel#INDEFINITE_TIMEOUT}. * @return the next available {@link Message} or {@code null} if the specified timeout - * period elapses or the message reception is interrupted + * period elapses or the message reception is interrupted */ Message receive(long timeout); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/SubscribableChannel.java b/spring-messaging/src/main/java/org/springframework/messaging/SubscribableChannel.java index 9cb0b8b04d..f4d00413d5 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/SubscribableChannel.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/SubscribableChannel.java @@ -17,26 +17,28 @@ package org.springframework.messaging; /** - * Interface for any MessageChannel implementation that accepts subscribers. - * The subscribers must implement the {@link MessageHandler} interface and - * will be invoked when a Message is available. + * A {@link MessageChannel} that maintains a registry of subscribers and invokes + * them to handle messages sent through this channel. * * @author Mark Fisher * @since 4.0 */ public interface SubscribableChannel extends MessageChannel { + /** - * Register a {@link MessageHandler} as a subscriber to this channel. - * @return {@code true} if the channel was not already subscribed to the specified - * handler + * Register a message handler. + * + * @return {@code true} if the handler was subscribed or {@code false} if it + * was already subscribed. */ boolean subscribe(MessageHandler handler); /** - * Remove a {@link MessageHandler} from the subscribers of this channel. - * @return {@code true} if the channel was previously subscribed to the specified - * handler + * Un-register a message handler. + * + * @return {@code true} if the handler was un-registered, or {@code false} + * if was not registered. */ boolean unsubscribe(MessageHandler handler); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractDestinationResolvingMessagingTemplate.java b/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractDestinationResolvingMessagingTemplate.java index ca77be8d98..d8d0c86ec2 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractDestinationResolvingMessagingTemplate.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractDestinationResolvingMessagingTemplate.java @@ -21,14 +21,20 @@ import org.springframework.messaging.Message; import org.springframework.util.Assert; /** - * Base class for a messaging template that can resolve String-based destinations. + * An extension of {@link AbstractMessagingTemplate} that adds operations for sending + * messages to a resolvable destination name as defined by the following interfaces: + *
    + *
  • {@link DestinationResolvingMessageSendingOperations}
  • + *
  • {@link DestinationResolvingMessageReceivingOperations}
  • + *
  • {@link DestinationResolvingMessageRequestReplyOperations}
  • + *
* * @author Mark Fisher * @author Rossen Stoyanchev * @since 4.0 */ -public abstract class AbstractDestinationResolvingMessagingTemplate extends - AbstractMessagingTemplate implements +public abstract class AbstractDestinationResolvingMessagingTemplate extends AbstractMessagingTemplate + implements DestinationResolvingMessageSendingOperations, DestinationResolvingMessageReceivingOperations, DestinationResolvingMessageRequestReplyOperations { @@ -36,19 +42,36 @@ public abstract class AbstractDestinationResolvingMessagingTemplate extends private volatile DestinationResolver destinationResolver; + /** + * Configure the {@link DestinationResolver} to use to resolve String destination + * names into actual destinations of type {@code }. + *

+ * This field does not have a default setting. If not configured, methods that + * require resolving a destination name will raise an {@link IllegalArgumentException}. + * + * @param destinationResolver the destination resolver to use + */ public void setDestinationResolver(DestinationResolver destinationResolver) { + Assert.notNull(destinationResolver, "'destinationResolver' is required"); this.destinationResolver = destinationResolver; } + /** + * Return the configured destination resolver. + */ + public DestinationResolver getDestinationResolver() { + return this.destinationResolver; + } + @Override - public

void send(String destinationName, Message

message) { + public void send(String destinationName, Message message) { D destination = resolveDestination(destinationName); this.doSend(destination, message); } protected final D resolveDestination(String destinationName) { - Assert.notNull(destinationResolver, "destinationResolver is required when passing a name only"); + Assert.state(this.destinationResolver != null, "destinationResolver is required to resolve destination names"); return this.destinationResolver.resolveDestination(destinationName); } @@ -79,7 +102,7 @@ public abstract class AbstractDestinationResolvingMessagingTemplate extends } @Override - public

Message

receive(String destinationName) { + public Message receive(String destinationName) { D destination = resolveDestination(destinationName); return super.receive(destination); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessageSendingTemplate.java b/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessageSendingTemplate.java index 6595609c72..15b9fb1504 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessageSendingTemplate.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessageSendingTemplate.java @@ -27,7 +27,7 @@ import org.springframework.messaging.support.converter.SimpleMessageConverter; import org.springframework.util.Assert; /** - * Base class for templates that support sending messages. + * Abstract base class for implementations of {@link MessageSendingOperations}. * * @author Mark Fisher * @author Rossen Stoyanchev @@ -42,18 +42,30 @@ public abstract class AbstractMessageSendingTemplate implements MessageSendin private volatile MessageConverter converter = new SimpleMessageConverter(); + /** + * Configure the default destination to use in send methods that don't have + * a destination argument. If a default destination is not configured, send methods + * without a destination argument will raise an exception if invoked. + * + * @param defaultDestination the default destination + */ public void setDefaultDestination(D defaultDestination) { this.defaultDestination = defaultDestination; } + /** + * Return the configured default destination. + */ public D getDefaultDestination() { return this.defaultDestination; } /** - * Set the {@link MessageConverter} that is to be used to convert - * between Messages and objects for this template. - *

The default is {@link SimpleMessageConverter}. + * Set the {@link MessageConverter} to use in {@code convertAndSend} methods. + *

+ * By default {@link SimpleMessageConverter} is used. + * + * @param messageConverter the message converter to use */ public void setMessageConverter(MessageConverter messageConverter) { Assert.notNull(messageConverter, "'messageConverter' must not be null"); @@ -61,7 +73,7 @@ public abstract class AbstractMessageSendingTemplate implements MessageSendin } /** - * @return the configured {@link MessageConverter} + * Return the configured {@link MessageConverter}. */ public MessageConverter getMessageConverter() { return this.converter; @@ -74,9 +86,7 @@ public abstract class AbstractMessageSendingTemplate implements MessageSendin } protected final D getRequiredDefaultDestination() { - Assert.state(this.defaultDestination != null, - "No 'defaultDestination' specified for MessagingTemplate. " - + "Unable to invoke method without an explicit destination argument."); + Assert.state(this.defaultDestination != null, "No 'defaultDestination' configured."); return this.defaultDestination; } @@ -89,8 +99,8 @@ public abstract class AbstractMessageSendingTemplate implements MessageSendin @Override - public void convertAndSend(Object message) throws MessagingException { - this.convertAndSend(getRequiredDefaultDestination(), message); + public void convertAndSend(Object payload) throws MessagingException { + this.convertAndSend(getRequiredDefaultDestination(), payload); } @Override diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java b/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java index ab9c7b518d..76b6df2a5f 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java @@ -21,7 +21,9 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; /** - * Base class for a messaging template that send and receive messages. + * An extension of {@link AbstractMessageSendingTemplate} that adds support for + * receive as well as request-reply style operations as defined by + * {@link MessageReceivingOperations} and {@link MessageRequestReplyOperations}. * * @author Mark Fisher * @author Rossen Stoyanchev @@ -32,16 +34,16 @@ public abstract class AbstractMessagingTemplate extends AbstractMessageSendin @Override - public

Message

receive() { + public Message receive() { return this.receive(getRequiredDefaultDestination()); } @Override - public

Message

receive(D destination) { + public Message receive(D destination) { return this.doReceive(destination); } - protected abstract

Message

doReceive(D destination); + protected abstract Message doReceive(D destination); @Override @@ -71,7 +73,7 @@ public abstract class AbstractMessagingTemplate extends AbstractMessageSendin return this.doSendAndReceive(destination, requestMessage); } - protected abstract Message doSendAndReceive(D destination, Message requestMessage); + protected abstract Message doSendAndReceive(D destination, Message requestMessage); @Override @@ -117,7 +119,7 @@ public abstract class AbstractMessagingTemplate extends AbstractMessageSendin requestMessage = postProcessor.postProcessMessage(requestMessage); } Message replyMessage = this.sendAndReceive(destination, requestMessage); - return (T) getMessageConverter().fromMessage(replyMessage, targetClass); + return (replyMessage != null) ? (T) getMessageConverter().fromMessage(replyMessage, targetClass) : null; } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/BeanFactoryMessageChannelDestinationResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/core/BeanFactoryMessageChannelDestinationResolver.java index 36bc979d9a..03bce04003 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/BeanFactoryMessageChannelDestinationResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/BeanFactoryMessageChannelDestinationResolver.java @@ -22,37 +22,38 @@ import org.springframework.messaging.MessageChannel; import org.springframework.util.Assert; /** - * {@link DestinationResolver} that resolves against named beans contained in a - * {@link BeanFactory}. + * An implementation of {@link DestinationResolver} that interprets a destination + * name as the bean name of a {@link MessageChannel} and looks up the bean in + * the configured {@link BeanFactory}. * * @author Mark Fisher * @since 4.0 */ -public class BeanFactoryMessageChannelDestinationResolver implements DestinationResolver, BeanFactoryAware { +public class BeanFactoryMessageChannelDestinationResolver + implements DestinationResolver, BeanFactoryAware { private volatile BeanFactory beanFactory; + /** - * Create a new instance of the {@link - * BeanFactoryMessageChannelDestinationResolver} class. - *

The BeanFactory to access must be set via setBeanFactory. - * This will happen automatically if this resolver is defined within an - * ApplicationContext thereby receiving the callback upon initialization. - * @see #setBeanFactory + * A default constructor that can be used when the resolver itself is configured + * as a Spring bean and will have the {@code BeanFactory} injected as a result + * of ing having implemented {@link BeanFactoryAware}. */ public BeanFactoryMessageChannelDestinationResolver() { + } + + @Override + public void setBeanFactory(BeanFactory beanFactory) throws BeansException { + this.beanFactory = beanFactory; } /** - * Create a new instance of the {@link - * BeanFactoryMessageChannelDestinationResolver} class. - *

Use of this constructor is redundant if this object is being created - * by a Spring IoC container as the supplied {@link BeanFactory} will be - * replaced by the {@link BeanFactory} that creates it (c.f. the - * {@link BeanFactoryAware} contract). So only use this constructor if you - * are instantiating this object explicitly rather than defining a bean. - * @param beanFactory the bean factory to be used to lookup {@link MessageChannel}s. + * A constructor that accepts a {@link BeanFactory} useful if instantiating this + * resolver manually rather than having it defined as a Spring-managed bean. + * + * @param beanFactory the bean factory to perform lookups against */ public BeanFactoryMessageChannelDestinationResolver(BeanFactory beanFactory) { Assert.notNull(beanFactory, "beanFactory must not be null"); @@ -62,19 +63,14 @@ public class BeanFactoryMessageChannelDestinationResolver implements Destination @Override public MessageChannel resolveDestination(String name) { - Assert.state(this.beanFactory != null, "BeanFactory must not be null"); + Assert.state(this.beanFactory != null, "No BeanFactory configured"); try { return this.beanFactory.getBean(name, MessageChannel.class); } catch (BeansException e) { throw new DestinationResolutionException( - "failed to look up MessageChannel bean with name '" + name + "'", e); + "Failed to find MessageChannel bean with name '" + name + "'", e); } } - @Override - public void setBeanFactory(BeanFactory beanFactory) throws BeansException { - this.beanFactory = beanFactory; - } - } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolutionException.java b/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolutionException.java index bc34eaef11..52eb63d3e9 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolutionException.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolutionException.java @@ -28,8 +28,10 @@ import org.springframework.messaging.MessagingException; @SuppressWarnings("serial") public class DestinationResolutionException extends MessagingException { + /** - * Create a new ChannelResolutionException. + * Create an instance with the given description only. + * * @param description the description */ public DestinationResolutionException(String description) { @@ -37,9 +39,10 @@ public class DestinationResolutionException extends MessagingException { } /** - * Create a new ChannelResolutionException. + * Create an instance with the given description and original cause. + * * @param description the description - * @param cause the root cause (if any) + * @param cause the root cause */ public DestinationResolutionException(String description, Throwable cause) { super(description, cause); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolver.java index 79e4c4af5c..3f14b9d2b4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolver.java @@ -17,18 +17,20 @@ package org.springframework.messaging.core; /** - * Strategy interface that resolves a given name to a destination. + * Strategy for resolving a String destination name into an actual destination + * of type {@code }. * * @author Mark Fisher * @since 4.0 */ public interface DestinationResolver { + /** - * Resolve the given {@code name} to a destination. - * @param name the name to resolve - * @return the destination (must not be {@code null}) - * @throws DestinationResolutionException if the name cannot be resolved + * Resolve the given destination name. + * + * @param name the destination name to resolve + * @return the destination, never {@code null} */ D resolveDestination(String name) throws DestinationResolutionException; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolvingMessageReceivingOperations.java b/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolvingMessageReceivingOperations.java index 1eca928832..53a2e4120e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolvingMessageReceivingOperations.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolvingMessageReceivingOperations.java @@ -19,16 +19,31 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; /** - * A {@link MessageReceivingOperations} that can resolve a String-based destinations. + * Extends {@link MessageReceivingOperations} and adds operations for receiving messages + * from a destination specified as a (resolvable) String name. * * @author Mark Fisher * @author Rossen Stoyanchev * @since 4.0 + * + * @see DestinationResolver */ public interface DestinationResolvingMessageReceivingOperations extends MessageReceivingOperations { -

Message

receive(String destinationName) throws MessagingException; + /** + * Resolve the given destination name and receive a message from it. + * + * @param destinationName the destination name to resolve + */ + Message receive(String destinationName) throws MessagingException; + /** + * Resolve the given destination name, receive a message from it, convert the + * payload to the specified target type. + * + * @param destinationName the destination name to resolve + * @param targetClass the target class for the converted payload + */ T receiveAndConvert(String destinationName, Class targetClass) throws MessagingException; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolvingMessageRequestReplyOperations.java b/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolvingMessageRequestReplyOperations.java index a562582b58..112092147d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolvingMessageRequestReplyOperations.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolvingMessageRequestReplyOperations.java @@ -21,25 +21,95 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; /** - * A {@link MessageRequestReplyOperations} that can resolve a String-based destinations. + * Extends {@link MessageRequestReplyOperations} and adds operations for sending and + * receiving messages to and from a destination specified as a (resolvable) String name. * * @author Mark Fisher * @author Rossen Stoyanchev * @since 4.0 + * + * @see DestinationResolver */ public interface DestinationResolvingMessageRequestReplyOperations extends MessageRequestReplyOperations { + /** + * Resolve the given destination name to a destination and send the given message, + * receive a reply and return it. + * + * @param destinationName the name of the target destination + * @param requestMessage the mesage to send + * @return the received message, possibly {@code null} if the message could not + * be received, for example due to a timeout + */ Message sendAndReceive(String destinationName, Message requestMessage) throws MessagingException; + /** + * Resolve the given destination name, convert the payload request Object + * to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message and send it to the resolved destination, receive a reply + * and convert its body to the specified target class. + * + * @param destinationName the name of the target destination + * @param request the payload for the request message to send + * @param targetClass the target class to convert the payload of the reply to + * @return the converted payload of the reply message, possibly {@code null} if + * the message could not be received, for example due to a timeout + */ T convertSendAndReceive(String destinationName, Object request, Class targetClass) throws MessagingException; + /** + * Resolve the given destination name, convert the payload request Object + * to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message with the given headers and send it to the resolved destination, + * receive a reply and convert its body to the specified target class. + * + * @param destinationName the name of the target destination + * @param request the payload for the request message to send + * @param headers the headers for the request message to send + * @param targetClass the target class to convert the payload of the reply to + * @return the converted payload of the reply message, possibly {@code null} if + * the message could not be received, for example due to a timeout + */ T convertSendAndReceive(String destinationName, Object request, Map headers, Class targetClass) throws MessagingException; + /** + * Resolve the given destination name, convert the payload request Object + * to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message, apply the given post process, and send the resulting + * message to the resolved destination, then receive a reply and convert its + * body to the specified target class. + * + * @param destinationName the name of the target destination + * @param request the payload for the request message to send + * @param targetClass the target class to convert the payload of the reply to + * @param requestPostProcessor post process for the request message + * @return the converted payload of the reply message, possibly {@code null} if + * the message could not be received, for example due to a timeout + */ T convertSendAndReceive(String destinationName, Object request, Class targetClass, MessagePostProcessor requestPostProcessor) throws MessagingException; + /** + * Resolve the given destination name, convert the payload request Object + * to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message with the given headers, apply the given post process, + * and send the resulting message to the resolved destination, then receive + * a reply and convert its body to the specified target class. + * + * @param destinationName the name of the target destination + * @param request the payload for the request message to send + * @param headers the headers for the request message to send + * @param targetClass the target class to convert the payload of the reply to + * @param requestPostProcessor post process for the request message + * @return the converted payload of the reply message, possibly {@code null} if + * the message could not be received, for example due to a timeout + */ T convertSendAndReceive(String destinationName, Object request, Map headers, Class targetClass, MessagePostProcessor requestPostProcessor) throws MessagingException; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolvingMessageSendingOperations.java b/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolvingMessageSendingOperations.java index 0f9e7b69ce..c314cb87f0 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolvingMessageSendingOperations.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/DestinationResolvingMessageSendingOperations.java @@ -21,23 +21,76 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; /** - * A {@link MessageSendingOperations} that can resolve a String-based destinations. + * Extends {@link MessageSendingOperations} and adds operations for sending messages + * to a destination specified as a (resolvable) String name. * * @author Mark Fisher * @author Rossen Stoyanchev * @since 4.0 + * + * @see DestinationResolver */ public interface DestinationResolvingMessageSendingOperations extends MessageSendingOperations { -

void send(String destinationName, Message

message) throws MessagingException; + /** + * Resolve the given destination name to a destination and send a message to it. + * + * @param destinationName the destination name to resolve + * @param message the message to send + */ + void send(String destinationName, Message message) throws MessagingException; + + /** + * Resolve the given destination name to a destination, convert the payload Object + * to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message and send it to the resolved destination. + * @param destinationName the destination name to resolve + * @param payload the Object to use as payload + */ void convertAndSend(String destinationName, T payload) throws MessagingException; - void convertAndSend(String destinationName, T payload, Map headers) throws MessagingException; + /** + * Resolve the given destination name to a destination, convert the payload + * Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message with the given headers and send it to the resolved + * destination. - void convertAndSend(String destinationName, T payload, - MessagePostProcessor postProcessor) throws MessagingException; + * @param destinationName the destination name to resolve + * @param payload the Object to use as payload + * @param headers headers for the message to send + */ + void convertAndSend(String destinationName, T payload, Map headers) + throws MessagingException; + + /** + * Resolve the given destination name to a destination, convert the payload + * Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message, apply the given post processor, and send the resulting + * message to the resolved destination. + + * @param destinationName the destination name to resolve + * @param payload the Object to use as payload + * @param postProcessor the post processor to apply to the message + */ + void convertAndSend(String destinationName, T payload, MessagePostProcessor postProcessor) + throws MessagingException; + + /** + * Resolve the given destination name to a destination, convert the payload + * Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message with the given headers, apply the given post processor, + * and send the resulting message to the resolved destination. + * @param destinationName the destination name to resolve + * @param payload the Object to use as payload + * @param headers headers for the message to send + * @param postProcessor the post processor to apply to the message + */ void convertAndSend(String destinationName, T payload, Map headers, MessagePostProcessor postProcessor) throws MessagingException; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/GenericMessagingTemplate.java b/spring-messaging/src/main/java/org/springframework/messaging/core/GenericMessagingTemplate.java index efa019ef8d..2880907d78 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/GenericMessagingTemplate.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/GenericMessagingTemplate.java @@ -32,10 +32,11 @@ import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; /** - * A messaging template for sending to and/or receiving messages from a - * {@link MessageChannel}. + * A messaging template that resolves destinations names to {@link MessageChannel}'s + * to send and receive messages from. * * @author Mark Fisher + * @author Rossen Stoyanchev * @since 4.0 */ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate @@ -49,7 +50,7 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag /** - * Specify the timeout value to use for send operations. + * Configure the timeout value to use for send operations. * * @param sendTimeout the send timeout in milliseconds */ @@ -58,7 +59,14 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag } /** - * Specify the timeout value to use for receive operations. + * Return the configured send operation timeout value. + */ + public long getSendTimeout() { + return this.sendTimeout; + } + + /** + * Configure the timeout value to use for receive operations. * * @param receiveTimeout the receive timeout in milliseconds */ @@ -67,12 +75,23 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag } /** - * Specify whether or not an attempt to send on the reply channel throws an exception - * if no receiving thread will actually receive the reply. This can occur - * if the receiving thread has already timed out, or will never call receive() - * because it caught an exception, or has already received a reply. - * (default false - just a WARN log is emitted in these cases). - * @param throwExceptionOnLateReply TRUE or FALSE. + * Return the configured receive operation timeout value. + */ + public long getReceiveTimeout() { + return this.receiveTimeout; + } + + /** + * Whether the thread sending a reply should have an exception raised if the + * receiving thread isn't going to receive the reply either because it timed out, + * or because it already received a reply, or because it got an exception while + * sending the request message. + *

+ * The default value is {@code false} in which case only a WARN message is logged. + * If set to {@code true} a {@link MessageDeliveryException} is raised in addition + * to the log message. + * + * @param throwExceptionOnLateReply whether to throw an exception or not */ public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) { this.throwExceptionOnLateReply = throwExceptionOnLateReply; @@ -85,88 +104,91 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag @Override - protected final void doSend(MessageChannel destination, Message message) { - Assert.notNull(destination, "channel must not be null"); + protected final void doSend(MessageChannel channel, Message message) { + + Assert.notNull(channel, "channel must not be null"); + long timeout = this.sendTimeout; - boolean sent = (timeout >= 0) - ? destination.send(message, timeout) - : destination.send(message); + boolean sent = (timeout >= 0) ? channel.send(message, timeout) : channel.send(message); + if (!sent) { throw new MessageDeliveryException(message, - "failed to send message to channel '" + destination + "' within timeout: " + timeout); + "failed to send message to channel '" + channel + "' within timeout: " + timeout); } } @SuppressWarnings("unchecked") @Override - protected final

Message

doReceive(MessageChannel destination) { - Assert.state(destination instanceof PollableChannel, - "The 'destination' must be a PollableChannel for receive operations."); + protected final Message doReceive(MessageChannel channel) { + + Assert.notNull(channel, "'channel' is required"); + Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages."); - Assert.notNull(destination, "channel must not be null"); long timeout = this.receiveTimeout; - Message message = (timeout >= 0) - ? ((PollableChannel) destination).receive(timeout) - : ((PollableChannel) destination).receive(); - if (message == null && this.logger.isTraceEnabled()) { - this.logger.trace("failed to receive message from channel '" + destination + "' within timeout: " + timeout); + Message message = (timeout >= 0) ? + ((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive(); + + if ((message == null) && this.logger.isTraceEnabled()) { + this.logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout); } - return (Message

) message; + + return message; } @Override - protected final Message doSendAndReceive(MessageChannel destination, Message requestMessage) { + protected final Message doSendAndReceive(MessageChannel channel, Message requestMessage) { + + Assert.notNull(channel, "'channel' is required"); + Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel(); Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel(); - TemporaryReplyChannel replyChannel = new TemporaryReplyChannel(this.receiveTimeout, this.throwExceptionOnLateReply); + + TemporaryReplyChannel tempReplyChannel = new TemporaryReplyChannel(); + requestMessage = MessageBuilder.fromMessage(requestMessage) - .setReplyChannel(replyChannel) - .setErrorChannel(replyChannel) - .build(); + .setReplyChannel(tempReplyChannel) + .setErrorChannel(tempReplyChannel).build(); + try { - this.doSend(destination, requestMessage); + this.doSend(channel, requestMessage); } catch (RuntimeException e) { - replyChannel.setClientWontReceive(true); + tempReplyChannel.setSendFailed(true); throw e; } - Message reply = this.doReceive(replyChannel); - if (reply != null) { - reply = MessageBuilder.fromMessage(reply) + + Message replyMessage = this.doReceive(tempReplyChannel); + if (replyMessage != null) { + replyMessage = MessageBuilder.fromMessage(replyMessage) .setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader) .setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader) .build(); } - return reply; - } - - - private static class TemporaryReplyChannel implements PollableChannel { - private static final Log logger = LogFactory.getLog(TemporaryReplyChannel.class); + return replyMessage; + } - private volatile Message message; - private final long receiveTimeout; + /** + * A temporary channel for receiving a single reply message. + */ + private class TemporaryReplyChannel implements PollableChannel { - private final CountDownLatch latch = new CountDownLatch(1); + private final Log logger = LogFactory.getLog(TemporaryReplyChannel.class); - private final boolean throwExceptionOnLateReply; + private volatile Message replyMessage; - private volatile boolean clientTimedOut; + private final CountDownLatch replyLatch = new CountDownLatch(1); - private volatile boolean clientWontReceive; + private volatile boolean hasReceived; - private volatile boolean clientHasReceived; + private volatile boolean hasTimedOut; + private volatile boolean hasSendFailed; - public TemporaryReplyChannel(long receiveTimeout, boolean throwExceptionOnLateReply) { - this.receiveTimeout = receiveTimeout; - this.throwExceptionOnLateReply = throwExceptionOnLateReply; - } - public void setClientWontReceive(boolean clientWontReceive) { - this.clientWontReceive = clientWontReceive; + public void setSendFailed(boolean hasSendError) { + this.hasSendFailed = hasSendError; } @@ -178,23 +200,23 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag @Override public Message receive(long timeout) { try { - if (this.receiveTimeout < 0) { - this.latch.await(); - this.clientHasReceived = true; + if (GenericMessagingTemplate.this.receiveTimeout < 0) { + this.replyLatch.await(); + this.hasReceived = true; } else { - if (this.latch.await(this.receiveTimeout, TimeUnit.MILLISECONDS)) { - this.clientHasReceived = true; + if (this.replyLatch.await(GenericMessagingTemplate.this.receiveTimeout, TimeUnit.MILLISECONDS)) { + this.hasReceived = true; } else { - this.clientTimedOut = true; + this.hasTimedOut = true; } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - return this.message; + return this.replyMessage; } @Override @@ -204,27 +226,31 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag @Override public boolean send(Message message, long timeout) { - this.message = message; - this.latch.countDown(); - if (this.clientTimedOut || this.clientHasReceived || this.clientWontReceive) { - String exceptionMessage = ""; - if (this.clientTimedOut) { - exceptionMessage = "Reply message being sent, but the receiving thread has already timed out"; - } - else if (this.clientHasReceived) { - exceptionMessage = "Reply message being sent, but the receiving thread has already received a reply"; - } - else if (this.clientWontReceive) { - exceptionMessage = "Reply message being sent, but the receiving thread has already caught an exception and won't receive"; - } + this.replyMessage = message; + this.replyLatch.countDown(); + + String errorDescription = null; + if (this.hasTimedOut) { + errorDescription = "Reply message received but the receiving thread has exited due to a timeout"; + } + else if (this.hasReceived) { + errorDescription = "Reply message received but the receiving thread has already received a reply"; + } + else if (this.hasSendFailed) { + errorDescription = "Reply message received but the receiving thread has exited due to " + + "an exception while sending the request message"; + } + + if (errorDescription != null) { if (logger.isWarnEnabled()) { - logger.warn(exceptionMessage + ":" + message); + logger.warn(errorDescription + ":" + message); } - if (this.throwExceptionOnLateReply) { - throw new MessageDeliveryException(message, exceptionMessage); + if (GenericMessagingTemplate.this.throwExceptionOnLateReply) { + throw new MessageDeliveryException(message, errorDescription); } } + return true; } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/MessagePostProcessor.java b/spring-messaging/src/main/java/org/springframework/messaging/core/MessagePostProcessor.java index 6717efa2d7..3c0d635a81 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/MessagePostProcessor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/MessagePostProcessor.java @@ -19,22 +19,23 @@ package org.springframework.messaging.core; import org.springframework.messaging.Message; /** - * To be used with MessagingTemplate's send method that converts an object to a message. - * It allows for further modification of the message after it has been processed - * by the converter. - * - *

This is often implemented as an anonymous class within a method implementation. + * A contract for processing a {@link Message} after it has been created, either + * returning a modified (effectively new) message or returning the same. * * @author Mark Fisher + * @author Rossen Stoyanchev * @since 4.0 + * + * @see MessageSendingOperations + * @see MessageRequestReplyOperations */ public interface MessagePostProcessor { /** - * Apply a MessagePostProcessor to the message. The returned message is - * typically a modified version of the original. - * @param message the message returned from the MessageConverter - * @return the modified version of the Message + * Process the given message. + * + * @param message the message to process + * @return a new or the same message, never {@code null} */ Message postProcessMessage(Message message); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/MessageReceivingOperations.java b/spring-messaging/src/main/java/org/springframework/messaging/core/MessageReceivingOperations.java index 41a4d681a9..dd48025d8c 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/MessageReceivingOperations.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/MessageReceivingOperations.java @@ -19,22 +19,54 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; /** - * A set of operations receiving messages from a destination. + * Operations for receiving messages from a destination. * - * @param the type of destination from which messages can be received + * @param the type of destination to receive messages from * * @author Mark Fisher * @author Rossen Stoyanchev * @since 4.0 + * + * @see GenericMessagingTemplate */ public interface MessageReceivingOperations { -

Message

receive() throws MessagingException; + /** + * Receive a message from a default destination. + * + * @return the received message, possibly {@code null} if the message could not + * be received, for example due to a timeout + */ + Message receive() throws MessagingException; -

Message

receive(D destination) throws MessagingException; + /** + * Receive a message from the given destination. + * + * @param destination the target destination + * @return the received message, possibly {@code null} if the message could not + * be received, for example due to a timeout + */ + Message receive(D destination) throws MessagingException; + /** + * Receive a message from a default destination and convert its payload to the + * specified target class. + * + * @param targetClass the target class to convert the payload to + * @return the converted payload of the reply message, possibly {@code null} if + * the message could not be received, for example due to a timeout + */ T receiveAndConvert(Class targetClass) throws MessagingException; + /** + * Receive a message from the given destination and convert its payload to the + * specified target class. + * + * @param destination the target destination + * @param targetClass the target class to convert the payload to + * @return the converted payload of the reply message, possibly {@code null} if + * the message could not be received, for example due to a timeout + */ T receiveAndConvert(D destination, Class targetClass) throws MessagingException; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/MessageRequestReplyOperations.java b/spring-messaging/src/main/java/org/springframework/messaging/core/MessageRequestReplyOperations.java index 4354193202..315f213876 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/MessageRequestReplyOperations.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/MessageRequestReplyOperations.java @@ -21,33 +21,127 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; /** - * A set of operations for exchanging messages to and from a destination. + * Operations for sending messages to and receiving the reply from a destination. * - * @param the type of destination to send and receive messages from + * @param the type of destination * * @author Mark Fisher * @author Rossen Stoyanchev * @since 4.0 + * + * @see GenericMessagingTemplate */ public interface MessageRequestReplyOperations { + /** + * Send a request message and receive the reply from a default destination. + * + * @param requestMessage the message to send + * @return the reply, possibly {@code null} if the message could not be received, + * for example due to a timeout + */ Message sendAndReceive(Message requestMessage) throws MessagingException; + /** + * Send a request message and receive the reply from the given destination. + * + * @param destination the target destination + * @param requestMessage the message to send + * @return the reply, possibly {@code null} if the message could not be received, + * for example due to a timeout + */ Message sendAndReceive(D destination, Message requestMessage) throws MessagingException; + /** + * Convert the given request Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, send + * it as a {@link Message} to a default destination, receive the reply and convert + * its body of the specified target class. + * + * @param request payload for the request message to send + * @param targetClass the target type to convert the payload of the reply to + * @return the payload of the reply message, possibly {@code null} if the message + * could not be received, for example due to a timeout + */ T convertSendAndReceive(Object request, Class targetClass) throws MessagingException; + /** + * Convert the given request Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, send + * it as a {@link Message} to the given destination, receive the reply and convert + * its body of the specified target class. + * + * @param destination the target destination + * @param request payload for the request message to send + * @param targetClass the target type to convert the payload of the reply to + * @return the payload of the reply message, possibly {@code null} if the message + * could not be received, for example due to a timeout + */ T convertSendAndReceive(D destination, Object request, Class targetClass) throws MessagingException; + /** + * Convert the given request Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, send + * it as a {@link Message} with the given headers, to the specified destination, + * receive the reply and convert its body of the specified target class. + * + * @param destination the target destination + * @param request payload for the request message to send + * @param headers headers for the request message to send + * @param targetClass the target type to convert the payload of the reply to + * @return the payload of the reply message, possibly {@code null} if the message + * could not be received, for example due to a timeout + */ T convertSendAndReceive(D destination, Object request, Map headers, Class targetClass) throws MessagingException; + /** + * Convert the given request Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * apply the given post processor and send the resulting {@link Message} to a + * default destination, receive the reply and convert its body of the given + * target class. + * + * @param request payload for the request message to send + * @param targetClass the target type to convert the payload of the reply to + * @param requestPostProcessor post process to apply to the request message + * @return the payload of the reply message, possibly {@code null} if the message + * could not be received, for example due to a timeout + */ T convertSendAndReceive(Object request, Class targetClass, MessagePostProcessor requestPostProcessor) throws MessagingException; + /** + * Convert the given request Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * apply the given post processor and send the resulting {@link Message} to the + * given destination, receive the reply and convert its body of the given + * target class. + * + * @param destination the target destination + * @param request payload for the request message to send + * @param targetClass the target type to convert the payload of the reply to + * @param requestPostProcessor post process to apply to the request message + * @return the payload of the reply message, possibly {@code null} if the message + * could not be received, for example due to a timeout + */ T convertSendAndReceive(D destination, Object request, Class targetClass, MessagePostProcessor requestPostProcessor) throws MessagingException; + /** + * Convert the given request Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message with the given headers, apply the given post processor + * and send the resulting {@link Message} to the specified destination, receive + * the reply and convert its body of the given target class. + * + * @param destination the target destination + * @param request payload for the request message to send + * @param targetClass the target type to convert the payload of the reply to + * @param requestPostProcessor post process to apply to the request message + * @return the payload of the reply message, possibly {@code null} if the message + * could not be received, for example due to a timeout + */ T convertSendAndReceive(D destination, Object request, Map headers, Class targetClass, MessagePostProcessor requestPostProcessor) throws MessagingException; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/MessageSendingOperations.java b/spring-messaging/src/main/java/org/springframework/messaging/core/MessageSendingOperations.java index a5407d5f86..890eac7c93 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/MessageSendingOperations.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/MessageSendingOperations.java @@ -21,9 +21,9 @@ import org.springframework.messaging.Message; import org.springframework.messaging.MessagingException; /** - * A set of operations sending messages to a destination. + * Operations for sending messages to a destination. * - * @param the type of destination to which messages can be sent + * @param the type of destination to send messages to * * @author Mark Fisher * @author Rossen Stoyanchev @@ -31,22 +31,87 @@ import org.springframework.messaging.MessagingException; */ public interface MessageSendingOperations { + /** + * Send a message to a default destination. + * + * @param message the message to send + */ void send(Message message) throws MessagingException; + /** + * Send a message to the given destination. + * + * @param destination the target destination + * @param message the message to send + */ void send(D destination, Message message) throws MessagingException; + /** + * Convert the given Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message and send it to a default destination. + * + * @param payload the Object to use as payload + */ void convertAndSend(Object payload) throws MessagingException; + /** + * Convert the given Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message and send it to the given destination. + * + * @param destination the target destination + * @param payload the Object to use as payload + */ void convertAndSend(D destination, Object payload) throws MessagingException; + /** + * Convert the given Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message with the given headers and send it to + * a default destination. + * + * @param destination the target destination + * @param payload the Object to use as payload + * @param headers headers for the message to send + */ void convertAndSend(D destination, Object payload, Map headers) throws MessagingException; + /** + * Convert the given Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message, apply the given post processor, and send + * the resulting message to a default destination. + * + * @param payload the Object to use as payload + * @param postProcessor the post processor to apply to the message + */ void convertAndSend(Object payload, MessagePostProcessor postProcessor) throws MessagingException; - void convertAndSend(D destination, Object payload, - MessagePostProcessor postProcessor) throws MessagingException; + /** + * Convert the given Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message, apply the given post processor, and send + * the resulting message to the given destination. + * + * @param destination the target destination + * @param payload the Object to use as payload + * @param postProcessor the post processor to apply to the message + */ + void convertAndSend(D destination, Object payload, MessagePostProcessor postProcessor) throws MessagingException; - void convertAndSend(D destination, Object payload, Map headers, - MessagePostProcessor postProcessor) throws MessagingException; + /** + * Convert the given Object to serialized form, possibly using a + * {@link org.springframework.messaging.support.converter.MessageConverter}, + * wrap it as a message with the given headers, apply the given post processor, + * and send the resulting message to the given destination. + * + * @param destination the target destination + * @param payload the Object to use as payload + * @param headers headers for the message to send + * @param postProcessor the post processor to apply to the message + */ + void convertAndSend(D destination, Object payload, Map headers, MessagePostProcessor postProcessor) + throws MessagingException; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/core/package-info.java b/spring-messaging/src/main/java/org/springframework/messaging/core/package-info.java index eb34bc7b73..6e95026655 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/core/package-info.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/core/package-info.java @@ -1,4 +1,4 @@ /** - * Provides core messaging classes. + * Defines interfaces and implementation classes for messaging templates. */ package org.springframework.messaging.core; \ No newline at end of file diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/HeaderMethodArgumentResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/HeaderMethodArgumentResolver.java index a46013f5e1..5ab96d8b47 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/HeaderMethodArgumentResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/HeaderMethodArgumentResolver.java @@ -20,6 +20,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.core.MethodParameter; import org.springframework.core.convert.ConversionService; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandlingException; import org.springframework.messaging.handler.annotation.Header; /** diff --git a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/PathVariableMethodArgumentResolver.java b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/PathVariableMethodArgumentResolver.java index 53f2cce898..f0cd93c649 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/PathVariableMethodArgumentResolver.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/handler/annotation/support/PathVariableMethodArgumentResolver.java @@ -21,6 +21,7 @@ import java.util.Map; import org.springframework.core.MethodParameter; import org.springframework.core.convert.ConversionService; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandlingException; import org.springframework.messaging.handler.annotation.PathVariable; import org.springframework.messaging.handler.annotation.ValueConstants; diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/ErrorMessage.java b/spring-messaging/src/main/java/org/springframework/messaging/support/ErrorMessage.java index 04e421f7b1..737543246a 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/ErrorMessage.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/ErrorMessage.java @@ -19,12 +19,12 @@ package org.springframework.messaging.support; import java.util.Map; /** - * A message implementation that accepts a {@link Throwable} payload. - * Once created this object is immutable. + * A {@link GenericMessage} with a {@link Throwable} payload. * * @author Mark Fisher * @author Oleg Zhurakousky * @since 4.0 + * * @see MessageBuilder */ public class ErrorMessage extends GenericMessage { @@ -32,10 +32,21 @@ public class ErrorMessage extends GenericMessage { private static final long serialVersionUID = -5470210965279837728L; + /** + * Create a new message with the given payload. + * + * @param payload the message payload, never {@code null} + */ public ErrorMessage(Throwable payload) { super(payload); } + /** + * Create a new message with the given payload and headers. + * + * @param payload the message payload, never {@code null} + * @param headers message headers + */ public ErrorMessage(Throwable payload, Map headers) { super(payload, headers); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/GenericMessage.java b/spring-messaging/src/main/java/org/springframework/messaging/support/GenericMessage.java index 3aac418832..03b65547c9 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/GenericMessage.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/GenericMessage.java @@ -26,11 +26,12 @@ import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; /** - * Base Message class defining common properties such as id, payload, and headers. - * Once created this object is immutable. + * An implementation of {@link Message} with a generic payload. + * Once created, a GenericMessage is immutable. * * @author Mark Fisher * @since 4.0 + * * @see MessageBuilder */ public class GenericMessage implements Message, Serializable { @@ -45,18 +46,18 @@ public class GenericMessage implements Message, Serializable { /** * Create a new message with the given payload. - * @param payload the message payload + * + * @param payload the message payload, never {@code null} */ public GenericMessage(T payload) { this(payload, null); } /** - * Create a new message with the given payload. The provided map will be used to - * populate the message headers - * @param payload the message payload + * Create a new message with the given payload and headers. + * + * @param payload the message payload, never {@code null} * @param headers message headers - * @see MessageHeaders */ public GenericMessage(T payload, Map headers) { Assert.notNull(payload, "payload must not be null"); @@ -88,7 +89,7 @@ public class GenericMessage implements Message, Serializable { sb.append("[Payload ").append(this.payload.getClass().getSimpleName()); sb.append(" content=").append(this.payload).append("]"); } - sb.append("[Headers=" + this.headers + "]"); + sb.append("[Headers=").append(this.headers).append("]"); return sb.toString(); } @@ -102,11 +103,8 @@ public class GenericMessage implements Message, Serializable { } if (obj != null && obj instanceof GenericMessage) { GenericMessage other = (GenericMessage) obj; - if (!this.headers.getId().equals(other.headers.getId())) { - return false; - } - return this.headers.equals(other.headers) - && this.payload.equals(other.payload); + return (this.headers.getId().equals(other.headers.getId()) && + this.headers.equals(other.headers) && this.payload.equals(other.payload)); } return false; } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/MessageBuilder.java b/spring-messaging/src/main/java/org/springframework/messaging/support/MessageBuilder.java index 742c1508ec..d0cef013f5 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/MessageBuilder.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/MessageBuilder.java @@ -23,12 +23,14 @@ import org.springframework.messaging.MessageChannel; import org.springframework.util.Assert; /** - * A builder for creating {@link GenericMessage} or {@link ErrorMessage} if the payload is - * {@link Throwable}. + * A builder for creating a {@link GenericMessage} (or {@link ErrorMessage} if + * the payload is of type {@link Throwable}). * * @author Arjen Poutsma * @author Mark Fisher + * @author Rossen Stoyanchev * @since 4.0 + * * @see GenericMessage * @see ErrorMessage */ @@ -56,6 +58,7 @@ public final class MessageBuilder { * Create a builder for a new {@link Message} instance pre-populated with all of the * headers copied from the provided message. The payload of the provided Message will * also be used as the payload for the new message. + * * @param message the Message from which the payload and all headers will be copied */ public static MessageBuilder fromMessage(Message message) { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/NativeMessageHeaderAccessor.java b/spring-messaging/src/main/java/org/springframework/messaging/support/NativeMessageHeaderAccessor.java index db4f8de95a..d83747c3d1 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/NativeMessageHeaderAccessor.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/NativeMessageHeaderAccessor.java @@ -27,9 +27,19 @@ import org.springframework.util.MultiValueMap; import org.springframework.util.ObjectUtils; /** - * An extension of {@link MessageHeaderAccessor} that also provides read/write access to - * message headers from an external message source. Native message headers are kept - * in a {@link MultiValueMap} under the key {@link #NATIVE_HEADERS}. + * An extension of {@link MessageHeaderAccessor} that also stores and provides read/write + * access to message headers from an external source -- e.g. a Spring {@link Message} + * created to represent a STOMP message received from a STOMP client or message broker. + * Native message headers are kept in a {@link MultiValueMap} under the key + * {@link #NATIVE_HEADERS}. + *

+ * This class is not intended for direct use but is rather expected to be consumed + * through sub-classes such as + * {@link org.springframework.messaging.simp.stomp.StompHeaderAccessor StompHeaderAccessor}. + * Such sub-classes may provide factory methods to translate message headers from + * an external messaging source (e.g. STOMP) to Spring {@link Message} headers and + * reversely to translate Spring {@link Message} headers to a message to send to an + * external source. * * @author Rossen Stoyanchev * @since 4.0 @@ -72,14 +82,6 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor { return null; } - /** - * Create {@link NativeMessageHeaderAccessor} from the headers of an existing message. - */ - public static NativeMessageHeaderAccessor wrap(Message message) { - return new NativeMessageHeaderAccessor(message); - } - - @Override public Map toMap() { Map result = super.toMap(); diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/package-info.java b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/package-info.java index 4bb730e1e8..df46d0f87d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/channel/package-info.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/channel/package-info.java @@ -1,4 +1,5 @@ /** - * Provides classes representing various channel types. + * Provides {@link org.springframework.messaging.MessageChannel} implementations + * classes as well as channel interceptor support. */ package org.springframework.messaging.support.channel; \ No newline at end of file diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/converter/package-info.java b/spring-messaging/src/main/java/org/springframework/messaging/support/converter/package-info.java index da8164d840..5877692ffa 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/support/converter/package-info.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/converter/package-info.java @@ -1,4 +1,4 @@ /** - * Provides classes supporting message conversion. + * Provides support for message conversion. */ package org.springframework.messaging.support.converter; \ No newline at end of file diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/package-info.java b/spring-messaging/src/main/java/org/springframework/messaging/support/package-info.java new file mode 100644 index 0000000000..f3f59a2869 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/package-info.java @@ -0,0 +1,6 @@ +/** + * Provides implementations of {@link org.springframework.messaging.Message} along with + * a MessageBuilder and MessageHeaderAccessor for building and working with messages + * and message headers. + */ +package org.springframework.messaging.support; \ No newline at end of file diff --git a/spring-messaging/src/main/java/overview.html b/spring-messaging/src/main/java/overview.html index c26c5a1027..b323e5273d 100644 --- a/spring-messaging/src/main/java/overview.html +++ b/spring-messaging/src/main/java/overview.html @@ -1,7 +1,7 @@

-Spring's support for messaging architectures and messaging protocols. +Spring Framework's support for messaging architectures and protocols.

\ No newline at end of file diff --git a/spring-messaging/src/test/java/org/springframework/messaging/core/DestinationResolvingMessagingTemplateTests.java b/spring-messaging/src/test/java/org/springframework/messaging/core/DestinationResolvingMessagingTemplateTests.java new file mode 100644 index 0000000000..fd8ed8ed8f --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/core/DestinationResolvingMessagingTemplateTests.java @@ -0,0 +1,257 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.core; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; + +/** + * Unit tests for {@link AbstractDestinationResolvingMessagingTemplate}. + * + * @author Rossen Stoyanchev + */ +public class DestinationResolvingMessagingTemplateTests { + + private TestDestinationResolvingMessagingTemplate template; + + private ExecutorSubscribableChannel myChannel; + + private Map headers; + + private TestMessagePostProcessor postProcessor; + + + @Before + public void setup() { + + TestMessageChannelDestinationResolver resolver = new TestMessageChannelDestinationResolver(); + + this.myChannel = new ExecutorSubscribableChannel(); + resolver.registerMessageChannel("myChannel", this.myChannel); + + this.template = new TestDestinationResolvingMessagingTemplate(); + this.template.setDestinationResolver(resolver); + + this.headers = Collections.singletonMap("key", "value"); + + this.postProcessor = new TestMessagePostProcessor(); + } + + + @Test + public void send() { + Message message = new GenericMessage("payload"); + this.template.send("myChannel", message); + + assertSame(this.myChannel, this.template.messageChannel); + assertSame(message, this.template.message); + } + + @Test(expected = IllegalStateException.class) + public void sendNoDestinationResolver() { + TestDestinationResolvingMessagingTemplate template = new TestDestinationResolvingMessagingTemplate(); + template.send("myChannel", new GenericMessage("payload")); + } + + @Test + public void convertAndSendPayload() { + this.template.convertAndSend("myChannel", "payload"); + + assertSame(this.myChannel, this.template.messageChannel); + assertNotNull(this.template.message); + assertSame("payload", this.template.message.getPayload()); + } + + @Test + public void convertAndSendPayloadAndHeaders() { + this.template.convertAndSend("myChannel", "payload", this.headers); + + assertSame(this.myChannel, this.template.messageChannel); + assertNotNull(this.template.message); + assertEquals("value", this.template.message.getHeaders().get("key")); + assertEquals("payload", this.template.message.getPayload()); + } + + @Test + public void convertAndSendPayloadWithPostProcessor() { + this.template.convertAndSend("myChannel", "payload", this.postProcessor); + + assertSame(this.myChannel, this.template.messageChannel); + assertNotNull(this.template.message); + assertEquals("payload", this.template.message.getPayload()); + + assertNotNull(this.postProcessor.getMessage()); + assertSame(this.postProcessor.getMessage(), this.template.message); + } + + @Test + public void convertAndSendPayloadAndHeadersWithPostProcessor() { + this.template.convertAndSend("myChannel", "payload", this.headers, this.postProcessor); + + assertSame(this.myChannel, this.template.messageChannel); + assertNotNull(this.template.message); + assertEquals("value", this.template.message.getHeaders().get("key")); + assertEquals("payload", this.template.message.getPayload()); + + assertNotNull(this.postProcessor.getMessage()); + assertSame(this.postProcessor.getMessage(), this.template.message); + } + + @Test + public void receive() { + Message expected = new GenericMessage("payload"); + this.template.setReceiveMessage(expected); + Message actual = this.template.receive("myChannel"); + + assertSame(expected, actual); + assertSame(this.myChannel, this.template.messageChannel); + } + + @Test + public void receiveAndConvert() { + Message expected = new GenericMessage("payload"); + this.template.setReceiveMessage(expected); + String payload = this.template.receiveAndConvert("myChannel", String.class); + + assertEquals("payload", payload); + assertSame(this.myChannel, this.template.messageChannel); + } + + @Test + public void sendAndReceive() { + Message requestMessage = new GenericMessage("request"); + Message responseMessage = new GenericMessage("response"); + this.template.setReceiveMessage(responseMessage); + Message actual = this.template.sendAndReceive("myChannel", requestMessage); + + assertEquals(requestMessage, this.template.message); + assertSame(responseMessage, actual); + assertSame(this.myChannel, this.template.messageChannel); + } + + @Test + public void convertSendAndReceive() { + Message responseMessage = new GenericMessage("response"); + this.template.setReceiveMessage(responseMessage); + String actual = this.template.convertSendAndReceive("myChannel", "request", String.class); + + assertEquals("request", this.template.message.getPayload()); + assertSame("response", actual); + assertSame(this.myChannel, this.template.messageChannel); + } + + @Test + public void convertSendAndReceiveWithHeaders() { + Message responseMessage = new GenericMessage("response"); + this.template.setReceiveMessage(responseMessage); + String actual = this.template.convertSendAndReceive("myChannel", "request", this.headers, String.class); + + assertEquals("value", this.template.message.getHeaders().get("key")); + assertEquals("request", this.template.message.getPayload()); + assertSame("response", actual); + assertSame(this.myChannel, this.template.messageChannel); + } + + @Test + public void convertSendAndReceiveWithPostProcessor() { + Message responseMessage = new GenericMessage("response"); + this.template.setReceiveMessage(responseMessage); + String actual = this.template.convertSendAndReceive("myChannel", "request", String.class, this.postProcessor); + + assertEquals("request", this.template.message.getPayload()); + assertSame("request", this.postProcessor.getMessage().getPayload()); + assertSame("response", actual); + assertSame(this.myChannel, this.template.messageChannel); + } + + @Test + public void convertSendAndReceiveWithHeadersAndPostProcessor() { + Message responseMessage = new GenericMessage("response"); + this.template.setReceiveMessage(responseMessage); + String actual = this.template.convertSendAndReceive("myChannel", "request", this.headers, + String.class, this.postProcessor); + + assertEquals("value", this.template.message.getHeaders().get("key")); + assertEquals("request", this.template.message.getPayload()); + assertSame("request", this.postProcessor.getMessage().getPayload()); + assertSame("response", actual); + assertSame(this.myChannel, this.template.messageChannel); + } + + + private static class TestDestinationResolvingMessagingTemplate + extends AbstractDestinationResolvingMessagingTemplate { + + private MessageChannel messageChannel; + + private Message message; + + private Message receiveMessage; + + + private void setReceiveMessage(Message receiveMessage) { + this.receiveMessage = receiveMessage; + } + + @Override + protected void doSend(MessageChannel channel, Message message) { + this.messageChannel = channel; + this.message = message; + } + + @Override + protected Message doReceive(MessageChannel channel) { + this.messageChannel = channel; + return this.receiveMessage; + } + + @Override + protected Message doSendAndReceive(MessageChannel channel, Message requestMessage) { + this.message = requestMessage; + this.messageChannel = channel; + return this.receiveMessage; + } + } + +} + +class TestMessageChannelDestinationResolver implements DestinationResolver { + + private final Map channels = new HashMap<>(); + + + public void registerMessageChannel(String name, MessageChannel channel) { + this.channels.put(name, channel); + } + + @Override + public MessageChannel resolveDestination(String name) throws DestinationResolutionException { + return this.channels.get(name); + } +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/core/GenericMessagingTemplateTests.java b/spring-messaging/src/test/java/org/springframework/messaging/core/GenericMessagingTemplateTests.java new file mode 100644 index 0000000000..b0b4a14446 --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/core/GenericMessagingTemplateTests.java @@ -0,0 +1,109 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.core; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.messaging.*; +import org.springframework.messaging.support.GenericMessage; +import org.springframework.messaging.support.channel.ExecutorSubscribableChannel; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static junit.framework.Assert.assertNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Unit tests for {@link GenericMessagingTemplate}. + * + * @author Rossen Stoyanchev + */ +public class GenericMessagingTemplateTests { + + private GenericMessagingTemplate template; + + private ThreadPoolTaskExecutor executor; + + + @Before + public void setup() { + + this.template = new GenericMessagingTemplate(); + + this.executor = new ThreadPoolTaskExecutor(); + this.executor.afterPropertiesSet(); + } + + + @Test + public void sendAndReceive() { + + SubscribableChannel channel = new ExecutorSubscribableChannel(this.executor); + channel.subscribe(new MessageHandler() { + @Override + public void handleMessage(Message message) throws MessagingException { + MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); + replyChannel.send(new GenericMessage("response")); + } + }); + + String actual = this.template.convertSendAndReceive(channel, "request", String.class); + + assertEquals("response", actual); + } + + @Test + public void sendAndReceiveTimeout() throws InterruptedException { + + final CountDownLatch latch = new CountDownLatch(1); + + this.template.setReceiveTimeout(1); + this.template.setThrowExceptionOnLateReply(true); + + SubscribableChannel channel = new ExecutorSubscribableChannel(this.executor); + channel.subscribe(new MessageHandler() { + @Override + public void handleMessage(Message message) throws MessagingException { + try { + Thread.sleep(500); + MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel(); + replyChannel.send(new GenericMessage("response")); + fail("Expected exception"); + } + catch (InterruptedException e) { + fail("Unexpected exception " + e.getMessage()); + } + catch (MessageDeliveryException ex) { + assertEquals("Reply message received but the receiving thread has already received a reply", + ex.getMessage()); + } + finally { + latch.countDown(); + } + } + }); + + assertNull(this.template.convertSendAndReceive(channel, "request", String.class)); + + assertTrue(latch.await(1000, TimeUnit.MILLISECONDS)); + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/core/MessageReceivingTemplateTests.java b/spring-messaging/src/test/java/org/springframework/messaging/core/MessageReceivingTemplateTests.java new file mode 100644 index 0000000000..688b9c80a4 --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/core/MessageReceivingTemplateTests.java @@ -0,0 +1,122 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.core; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** + * Unit tests for receiving operations in {@link AbstractMessagingTemplate}. + * + * @author Rossen Stoyanchev + * + * @see MessageRequestReplyTemplateTests + */ +public class MessageReceivingTemplateTests { + + private TestMessagingTemplate template; + + + @Before + public void setup() { + this.template = new TestMessagingTemplate(); + } + + @Test + public void receive() { + Message expected = new GenericMessage("payload"); + this.template.setDefaultDestination("home"); + this.template.setReceiveMessage(expected); + Message actual = this.template.receive(); + + assertEquals("home", this.template.destination); + assertSame(expected, actual); + } + + @Test(expected = IllegalStateException.class) + public void receiveMissingDefaultDestination() { + this.template.receive(); + } + + @Test + public void receiveFromDestination() { + Message expected = new GenericMessage("payload"); + this.template.setReceiveMessage(expected); + Message actual = this.template.receive("somewhere"); + + assertEquals("somewhere", this.template.destination); + assertSame(expected, actual); + } + + @Test + public void receiveAndConvert() { + Message expected = new GenericMessage("payload"); + this.template.setDefaultDestination("home"); + this.template.setReceiveMessage(expected); + String payload = this.template.receiveAndConvert(String.class); + + assertEquals("home", this.template.destination); + assertSame("payload", payload); + } + + @Test + public void receiveAndConvertFromDestination() { + Message expected = new GenericMessage("payload"); + this.template.setReceiveMessage(expected); + String payload = this.template.receiveAndConvert("somewhere", String.class); + + assertEquals("somewhere", this.template.destination); + assertSame("payload", payload); + } + + + + private static class TestMessagingTemplate extends AbstractMessagingTemplate { + + private String destination; + + private Message receiveMessage; + + + private void setReceiveMessage(Message receiveMessage) { + this.receiveMessage = receiveMessage; + } + + @Override + protected void doSend(String destination, Message message) { + } + + @Override + protected Message doReceive(String destination) { + this.destination = destination; + return this.receiveMessage; + } + + @Override + protected Message doSendAndReceive(String destination, Message requestMessage) { + this.destination = destination; + return null; + } + + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/core/MessageRequestReplyTemplateTests.java b/spring-messaging/src/test/java/org/springframework/messaging/core/MessageRequestReplyTemplateTests.java new file mode 100644 index 0000000000..bce843276e --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/core/MessageRequestReplyTemplateTests.java @@ -0,0 +1,190 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.core; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; + +import java.util.Collections; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** + * Unit tests for request and reply operations in {@link AbstractMessagingTemplate}. + * + * @author Rossen Stoyanchev + * + * @see MessageReceivingTemplateTests + */ +public class MessageRequestReplyTemplateTests { + + private TestMessagingTemplate template; + + private TestMessagePostProcessor postProcessor; + + private Map headers; + + + @Before + public void setup() { + this.template = new TestMessagingTemplate(); + this.postProcessor = new TestMessagePostProcessor(); + this.headers = Collections.singletonMap("key", "value"); + } + + + @Test + public void sendAndReceive() { + Message requestMessage = new GenericMessage("request"); + Message responseMessage = new GenericMessage("response"); + this.template.setDefaultDestination("home"); + this.template.setReceiveMessage(responseMessage); + Message actual = this.template.sendAndReceive(requestMessage); + + assertEquals("home", this.template.destination); + assertSame(requestMessage, this.template.requestMessage); + assertSame(responseMessage, actual); + } + + @Test(expected = IllegalStateException.class) + public void sendAndReceiveMissingDestination() { + this.template.sendAndReceive(new GenericMessage("request")); + } + + @Test + public void sendAndReceiveToDestination() { + Message requestMessage = new GenericMessage("request"); + Message responseMessage = new GenericMessage("response"); + this.template.setReceiveMessage(responseMessage); + Message actual = this.template.sendAndReceive("somewhere", requestMessage); + + assertEquals("somewhere", this.template.destination); + assertSame(requestMessage, this.template.requestMessage); + assertSame(responseMessage, actual); + } + + @Test + public void convertAndSend() { + Message responseMessage = new GenericMessage("response"); + this.template.setDefaultDestination("home"); + this.template.setReceiveMessage(responseMessage); + String response = this.template.convertSendAndReceive("request", String.class); + + assertEquals("home", this.template.destination); + assertSame("request", this.template.requestMessage.getPayload()); + assertSame("response", response); + } + + @Test + public void convertAndSendToDestination() { + Message responseMessage = new GenericMessage("response"); + this.template.setReceiveMessage(responseMessage); + String response = this.template.convertSendAndReceive("somewhere", "request", String.class); + + assertEquals("somewhere", this.template.destination); + assertSame("request", this.template.requestMessage.getPayload()); + assertSame("response", response); + } + + @Test + public void convertAndSendToDestinationWithHeaders() { + Message responseMessage = new GenericMessage("response"); + this.template.setReceiveMessage(responseMessage); + String response = this.template.convertSendAndReceive("somewhere", "request", this.headers, String.class); + + assertEquals("somewhere", this.template.destination); + assertEquals("value", this.template.requestMessage.getHeaders().get("key")); + assertSame("request", this.template.requestMessage.getPayload()); + assertSame("response", response); + } + + @Test + public void convertAndSendWithPostProcessor() { + Message responseMessage = new GenericMessage("response"); + this.template.setDefaultDestination("home"); + this.template.setReceiveMessage(responseMessage); + String response = this.template.convertSendAndReceive("request", String.class, this.postProcessor); + + assertEquals("home", this.template.destination); + assertSame("request", this.template.requestMessage.getPayload()); + assertSame("response", response); + assertSame(this.postProcessor.getMessage(), this.template.requestMessage); + } + + @Test + public void convertAndSendToDestinationWithPostProcessor() { + Message responseMessage = new GenericMessage("response"); + this.template.setReceiveMessage(responseMessage); + String response = this.template.convertSendAndReceive("somewhere", "request", String.class, this.postProcessor); + + assertEquals("somewhere", this.template.destination); + assertSame("request", this.template.requestMessage.getPayload()); + assertSame("response", response); + assertSame(this.postProcessor.getMessage(), this.template.requestMessage); + } + + @Test + public void convertAndSendToDestinationWithHeadersAndPostProcessor() { + Message responseMessage = new GenericMessage("response"); + this.template.setReceiveMessage(responseMessage); + String response = this.template.convertSendAndReceive("somewhere", "request", this.headers, + String.class, this.postProcessor); + + assertEquals("somewhere", this.template.destination); + assertEquals("value", this.template.requestMessage.getHeaders().get("key")); + assertSame("request", this.template.requestMessage.getPayload()); + assertSame("response", response); + assertSame(this.postProcessor.getMessage(), this.template.requestMessage); + } + + + private static class TestMessagingTemplate extends AbstractMessagingTemplate { + + private String destination; + + private Message requestMessage; + + private Message receiveMessage; + + + private void setReceiveMessage(Message receiveMessage) { + this.receiveMessage = receiveMessage; + } + + @Override + protected void doSend(String destination, Message message) { + } + + @Override + protected Message doReceive(String destination) { + this.destination = destination; + return this.receiveMessage; + } + + @Override + protected Message doSendAndReceive(String destination, Message requestMessage) { + this.destination = destination; + this.requestMessage = requestMessage; + return this.receiveMessage; + } + } + +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/core/MessageSendingTemplateTests.java b/spring-messaging/src/test/java/org/springframework/messaging/core/MessageSendingTemplateTests.java new file mode 100644 index 0000000000..1bb1b4a81a --- /dev/null +++ b/spring-messaging/src/test/java/org/springframework/messaging/core/MessageSendingTemplateTests.java @@ -0,0 +1,180 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.core; + +import org.junit.Before; +import org.junit.Test; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; + +import java.util.Collections; +import java.util.Map; + +import static junit.framework.Assert.assertNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; + +/** + * Unit tests for {@link AbstractMessageSendingTemplate}. + * + * @author Rossen Stoyanchev + */ +public class MessageSendingTemplateTests { + + private TestMessageSendingTemplate template; + + private TestMessagePostProcessor postProcessor; + + private Map headers; + + + @Before + public void setup() { + this.template = new TestMessageSendingTemplate(); + this.postProcessor = new TestMessagePostProcessor(); + this.headers = Collections.singletonMap("key", "value"); + } + + + @Test + public void send() { + Message message = new GenericMessage("payload"); + this.template.setDefaultDestination("home"); + this.template.send(message); + + assertEquals("home", this.template.destination); + assertSame(message, this.template.message); + } + + @Test + public void sendToDestination() { + Message message = new GenericMessage("payload"); + this.template.send("somewhere", message); + + assertEquals("somewhere", this.template.destination); + assertSame(message, this.template.message); + } + + @Test(expected = IllegalStateException.class) + public void sendMissingDestination() { + Message message = new GenericMessage("payload"); + this.template.send(message); + } + + @Test + public void convertAndSend() { + this.template.convertAndSend("somewhere", "payload", headers, this.postProcessor); + + assertEquals("somewhere", this.template.destination); + assertNotNull(this.template.message); + assertEquals("value", this.template.message.getHeaders().get("key")); + assertEquals("payload", this.template.message.getPayload()); + + assertNotNull(this.postProcessor.getMessage()); + assertSame(this.template.message, this.postProcessor.getMessage()); + } + + @Test + public void convertAndSendPayload() { + this.template.setDefaultDestination("home"); + this.template.convertAndSend("payload"); + + assertEquals("home", this.template.destination); + assertNotNull(this.template.message); + assertEquals("expected 'id' and 'timestamp' headers only", 2, this.template.message.getHeaders().size()); + assertEquals("payload", this.template.message.getPayload()); + } + + @Test + public void convertAndSendPayloadToDestination() { + this.template.convertAndSend("somewhere", "payload"); + + assertEquals("somewhere", this.template.destination); + assertNotNull(this.template.message); + assertEquals("expected 'id' and 'timestamp' headers only", 2, this.template.message.getHeaders().size()); + assertEquals("payload", this.template.message.getPayload()); + } + + @Test + public void convertAndSendPayloadAndHeadersToDestination() { + this.template.convertAndSend("somewhere", "payload", headers); + + assertEquals("somewhere", this.template.destination); + assertNotNull(this.template.message); + assertEquals("value", this.template.message.getHeaders().get("key")); + assertEquals("payload", this.template.message.getPayload()); + } + + + @Test + public void convertAndSendPayloadWithPostProcessor() { + this.template.setDefaultDestination("home"); + this.template.convertAndSend((Object) "payload", this.postProcessor); + + assertEquals("home", this.template.destination); + assertNotNull(this.template.message); + assertEquals("expected 'id' and 'timestamp' headers only", 2, this.template.message.getHeaders().size()); + assertEquals("payload", this.template.message.getPayload()); + + assertNotNull(this.postProcessor.getMessage()); + assertSame(this.template.message, this.postProcessor.getMessage()); + } + + @Test + public void convertAndSendPayloadWithPostProcessorToDestination() { + this.template.convertAndSend("somewhere", "payload", this.postProcessor); + + assertEquals("somewhere", this.template.destination); + assertNotNull(this.template.message); + assertEquals("expected 'id' and 'timestamp' headers only", 2, this.template.message.getHeaders().size()); + assertEquals("payload", this.template.message.getPayload()); + + assertNotNull(this.postProcessor.getMessage()); + assertSame(this.template.message, this.postProcessor.getMessage()); + } + + + private static class TestMessageSendingTemplate extends AbstractMessageSendingTemplate { + + private String destination; + + private Message message; + + @Override + protected void doSend(String destination, Message message) { + this.destination = destination; + this.message = message; + } + } + +} + +class TestMessagePostProcessor implements MessagePostProcessor { + + private Message message; + + + Message getMessage() { + return this.message; + } + + @Override + public Message postProcessMessage(Message message) { + this.message = message; + return message; + } +} diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/HeaderMethodArgumentResolverTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/HeaderMethodArgumentResolverTests.java index de0b269004..70720fa6d3 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/HeaderMethodArgumentResolverTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/HeaderMethodArgumentResolverTests.java @@ -26,6 +26,7 @@ import org.springframework.core.GenericTypeResolver; import org.springframework.core.MethodParameter; import org.springframework.core.convert.support.DefaultConversionService; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandlingException; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.support.MessageBuilder; diff --git a/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/PathVariableMethodArgumentResolverTests.java b/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/PathVariableMethodArgumentResolverTests.java index d4d31bf546..1b61575134 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/PathVariableMethodArgumentResolverTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/handler/annotation/support/PathVariableMethodArgumentResolverTests.java @@ -27,6 +27,7 @@ import org.springframework.core.GenericTypeResolver; import org.springframework.core.MethodParameter; import org.springframework.core.convert.support.DefaultConversionService; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHandlingException; import org.springframework.messaging.handler.annotation.PathVariable; import org.springframework.messaging.support.MessageBuilder; diff --git a/spring-messaging/src/test/java/org/springframework/messaging/support/MessageHeaderAccessorTests.java b/spring-messaging/src/test/java/org/springframework/messaging/support/MessageHeaderAccessorTests.java index e2db352828..c1200ea60a 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/support/MessageHeaderAccessorTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/support/MessageHeaderAccessorTests.java @@ -44,7 +44,7 @@ public class MessageHeaderAccessorTests { Map original = new HashMap<>(); original.put("foo", "bar"); original.put("bar", "baz"); - GenericMessage message = new GenericMessage<>("p", original); + GenericMessage message = new GenericMessage<>("payload", original); MessageHeaderAccessor headers = new MessageHeaderAccessor(message); Map actual = headers.toMap(); @@ -61,7 +61,7 @@ public class MessageHeaderAccessorTests { Map original = new HashMap<>(); original.put("foo", "bar"); original.put("bar", "baz"); - GenericMessage message = new GenericMessage<>("p", original); + GenericMessage message = new GenericMessage<>("payload", original); MessageHeaderAccessor headers = new MessageHeaderAccessor(message); headers.setHeader("foo", "BAR");