Browse Source

Polishing

pull/635/head
Juergen Hoeller 10 years ago
parent
commit
820a160578
  1. 4
      spring-messaging/src/main/java/org/springframework/messaging/converter/MessageConversionException.java
  2. 19
      spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessageReceivingTemplate.java
  3. 64
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java
  4. 2
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java
  5. 14
      spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java
  6. 21
      spring-messaging/src/main/java/org/springframework/messaging/support/AbstractHeaderMapper.java
  7. 23
      spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java
  8. 9
      spring-messaging/src/main/java/org/springframework/messaging/support/GenericMessage.java

4
spring-messaging/src/main/java/org/springframework/messaging/converter/MessageConversionException.java

@ -36,6 +36,10 @@ public class MessageConversionException extends MessagingException {
super(description, cause); super(description, cause);
} }
public MessageConversionException(Message<?> failedMessage, String description) {
super(failedMessage, description);
}
public MessageConversionException(Message<?> failedMessage, String description, Throwable cause) { public MessageConversionException(Message<?> failedMessage, String description, Throwable cause) {
super(failedMessage, description, cause); super(failedMessage, description, cause);
} }

19
spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessageReceivingTemplate.java

@ -34,7 +34,7 @@ public abstract class AbstractMessageReceivingTemplate<D> extends AbstractMessag
@Override @Override
public Message<?> receive() { public Message<?> receive() {
return receive(getRequiredDefaultDestination()); return doReceive(getRequiredDefaultDestination());
} }
@Override @Override
@ -42,6 +42,12 @@ public abstract class AbstractMessageReceivingTemplate<D> extends AbstractMessag
return doReceive(destination); return doReceive(destination);
} }
/**
* Actually receive a message from the given destination.
* @param destination the target destination
* @return the received message, possibly {@code null} if the message could not
* be received, for example due to a timeout
*/
protected abstract Message<?> doReceive(D destination); protected abstract Message<?> doReceive(D destination);
@ -62,14 +68,19 @@ public abstract class AbstractMessageReceivingTemplate<D> extends AbstractMessag
} }
} }
/**
* Convert from the given message to the given target class.
* @param message the message to convert
* @param targetClass the target class to convert the payload to
* @return the converted payload of the reply message (never {@code null})
*/
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected <T> T doConvert(Message<?> message, Class<T> targetClass) { protected <T> T doConvert(Message<?> message, Class<T> targetClass) {
MessageConverter messageConverter = getMessageConverter(); MessageConverter messageConverter = getMessageConverter();
T value = (T) messageConverter.fromMessage(message, targetClass); T value = (T) messageConverter.fromMessage(message, targetClass);
if (value == null) { if (value == null) {
throw new MessageConversionException(message, "Unable to convert payload='" throw new MessageConversionException(message, "Unable to convert payload [" + message.getPayload() +
+ message.getPayload() + "' to type='" + targetClass "] to type [" + targetClass + "] using converter [" + messageConverter + "]");
+ "', converter=[" + messageConverter + "]", null);
} }
return value; return value;
} }

64
spring-messaging/src/main/java/org/springframework/messaging/simp/config/MessageBrokerRegistry.java

@ -43,14 +43,14 @@ public class MessageBrokerRegistry {
private StompBrokerRelayRegistration brokerRelayRegistration; private StompBrokerRelayRegistration brokerRelayRegistration;
private final ChannelRegistration brokerChannelRegistration = new ChannelRegistration();
private String[] applicationDestinationPrefixes; private String[] applicationDestinationPrefixes;
private String userDestinationPrefix; private String userDestinationPrefix;
private PathMatcher pathMatcher; private PathMatcher pathMatcher;
private ChannelRegistration brokerChannelRegistration = new ChannelRegistration();
public MessageBrokerRegistry(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel) { public MessageBrokerRegistry(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel) {
Assert.notNull(clientInboundChannel); Assert.notNull(clientInboundChannel);
@ -59,6 +59,7 @@ public class MessageBrokerRegistry {
this.clientOutboundChannel = clientOutboundChannel; this.clientOutboundChannel = clientOutboundChannel;
} }
/** /**
* Enable a simple message broker and configure one or more prefixes to filter * Enable a simple message broker and configure one or more prefixes to filter
* destinations targeting the broker (e.g. destinations prefixed with "/topic"). * destinations targeting the broker (e.g. destinations prefixed with "/topic").
@ -80,6 +81,21 @@ public class MessageBrokerRegistry {
return this.brokerRelayRegistration; return this.brokerRelayRegistration;
} }
/**
* Customize the channel used to send messages from the application to the message
* broker. By default, messages from the application to the message broker are sent
* synchronously, which means application code sending a message will find out
* if the message cannot be sent through an exception. However, this can be changed
* if the broker channel is configured here with task executor properties.
*/
public ChannelRegistration configureBrokerChannel() {
return this.brokerChannelRegistration;
}
protected ChannelRegistration getBrokerChannelRegistration() {
return this.brokerChannelRegistration;
}
/** /**
* Configure one or more prefixes to filter destinations targeting application * Configure one or more prefixes to filter destinations targeting application
* annotated methods. For example destinations prefixed with "/app" may be * annotated methods. For example destinations prefixed with "/app" may be
@ -95,6 +111,11 @@ public class MessageBrokerRegistry {
return this; return this;
} }
protected Collection<String> getApplicationDestinationPrefixes() {
return (this.applicationDestinationPrefixes != null ?
Arrays.asList(this.applicationDestinationPrefixes) : null);
}
/** /**
* Configure the prefix used to identify user destinations. User destinations * Configure the prefix used to identify user destinations. User destinations
* provide the ability for a user to subscribe to queue names unique to their * provide the ability for a user to subscribe to queue names unique to their
@ -112,24 +133,24 @@ public class MessageBrokerRegistry {
return this; return this;
} }
protected String getUserDestinationPrefix() {
return this.userDestinationPrefix;
}
/** /**
* Configure the PathMatcher to use to match the destinations of incoming * Configure the PathMatcher to use to match the destinations of incoming
* messages to {@code @MessageMapping} and {@code @SubscribeMapping} methods. * messages to {@code @MessageMapping} and {@code @SubscribeMapping} methods.
*
* <p>By default {@link org.springframework.util.AntPathMatcher} is configured. * <p>By default {@link org.springframework.util.AntPathMatcher} is configured.
* However applications may provide an {@code AntPathMatcher} instance * However applications may provide an {@code AntPathMatcher} instance
* customized to use "." (commonly used in messaging) instead of "/" as path * customized to use "." (commonly used in messaging) instead of "/" as path
* separator or provide a completely different PathMatcher implementation. * separator or provide a completely different PathMatcher implementation.
*
* <p>Note that the configured PathMatcher is only used for matching the * <p>Note that the configured PathMatcher is only used for matching the
* portion of the destination after the configured prefix. For example given * portion of the destination after the configured prefix. For example given
* application destination prefix "/app" and destination "/app/price.stock.**", * application destination prefix "/app" and destination "/app/price.stock.**",
* the message might be mapped to a controller with "price" and "stock.**" * the message might be mapped to a controller with "price" and "stock.**"
* as its type and method-level mappings respectively. * as its type and method-level mappings respectively.
*
* <p>When the simple broker is enabled, the PathMatcher configured here is * <p>When the simple broker is enabled, the PathMatcher configured here is
* also used to match message destinations when brokering messages. * also used to match message destinations when brokering messages.
*
* @since 4.1 * @since 4.1
*/ */
public MessageBrokerRegistry setPathMatcher(PathMatcher pathMatcher) { public MessageBrokerRegistry setPathMatcher(PathMatcher pathMatcher) {
@ -137,19 +158,13 @@ public class MessageBrokerRegistry {
return this; return this;
} }
/** protected PathMatcher getPathMatcher() {
* Customize the channel used to send messages from the application to the message return this.pathMatcher;
* broker. By default messages from the application to the message broker are sent
* synchronously, which means application code sending a message will find out
* if the message cannot be sent through an exception. However, this can be changed
* if the broker channel is configured here with task executor properties.
*/
public ChannelRegistration configureBrokerChannel() {
return this.brokerChannelRegistration;
} }
protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) { protected SimpleBrokerMessageHandler getSimpleBroker(SubscribableChannel brokerChannel) {
if ((this.simpleBrokerRegistration == null) && (this.brokerRelayRegistration == null)) { if (this.simpleBrokerRegistration == null && this.brokerRelayRegistration == null) {
enableSimpleBroker(); enableSimpleBroker();
} }
if (this.simpleBrokerRegistration != null) { if (this.simpleBrokerRegistration != null) {
@ -167,21 +182,4 @@ public class MessageBrokerRegistry {
return null; return null;
} }
protected Collection<String> getApplicationDestinationPrefixes() {
return (this.applicationDestinationPrefixes != null)
? Arrays.asList(this.applicationDestinationPrefixes) : null;
}
protected String getUserDestinationPrefix() {
return this.userDestinationPrefix;
}
protected PathMatcher getPathMatcher() {
return this.pathMatcher;
}
protected ChannelRegistration getBrokerChannelRegistration() {
return this.brokerChannelRegistration;
}
} }

2
spring-messaging/src/main/java/org/springframework/messaging/simp/config/SimpleBrokerRegistration.java

@ -28,12 +28,10 @@ import org.springframework.messaging.simp.broker.SimpleBrokerMessageHandler;
*/ */
public class SimpleBrokerRegistration extends AbstractBrokerRegistration { public class SimpleBrokerRegistration extends AbstractBrokerRegistration {
public SimpleBrokerRegistration(SubscribableChannel inChannel, MessageChannel outChannel, String[] prefixes) { public SimpleBrokerRegistration(SubscribableChannel inChannel, MessageChannel outChannel, String[] prefixes) {
super(inChannel, outChannel, prefixes); super(inChannel, outChannel, prefixes);
} }
@Override @Override
protected SimpleBrokerMessageHandler getMessageHandler(SubscribableChannel brokerChannel) { protected SimpleBrokerMessageHandler getMessageHandler(SubscribableChannel brokerChannel) {
return new SimpleBrokerMessageHandler(getClientInboundChannel(), return new SimpleBrokerMessageHandler(getClientInboundChannel(),

14
spring-messaging/src/main/java/org/springframework/messaging/simp/config/StompBrokerRelayRegistration.java

@ -74,12 +74,10 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
return this; return this;
} }
/** /**
* Set the login to use when creating connections to the STOMP broker on * Set the login to use when creating connections to the STOMP broker on
* behalf of connected clients. * behalf of connected clients.
* <p> * <p>By default this is set to "guest".
* By default this is set to "guest".
*/ */
public StompBrokerRelayRegistration setClientLogin(String login) { public StompBrokerRelayRegistration setClientLogin(String login) {
Assert.hasText(login, "clientLogin must not be empty"); Assert.hasText(login, "clientLogin must not be empty");
@ -90,8 +88,7 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
/** /**
* Set the passcode to use when creating connections to the STOMP broker on * Set the passcode to use when creating connections to the STOMP broker on
* behalf of connected clients. * behalf of connected clients.
* <p> * <p>By default this is set to "guest".
* By default this is set to "guest".
*/ */
public StompBrokerRelayRegistration setClientPasscode(String passcode) { public StompBrokerRelayRegistration setClientPasscode(String passcode) {
Assert.hasText(passcode, "clientPasscode must not be empty"); Assert.hasText(passcode, "clientPasscode must not be empty");
@ -103,8 +100,7 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
* Set the login for the shared "system" connection used to send messages to * Set the login for the shared "system" connection used to send messages to
* the STOMP broker from within the application, i.e. messages not associated * the STOMP broker from within the application, i.e. messages not associated
* with a specific client session (e.g. REST/HTTP request handling method). * with a specific client session (e.g. REST/HTTP request handling method).
* <p> * <p>By default this is set to "guest".
* By default this is set to "guest".
*/ */
public StompBrokerRelayRegistration setSystemLogin(String login) { public StompBrokerRelayRegistration setSystemLogin(String login) {
Assert.hasText(login, "systemLogin must not be empty"); Assert.hasText(login, "systemLogin must not be empty");
@ -116,8 +112,7 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
* Set the passcode for the shared "system" connection used to send messages to * Set the passcode for the shared "system" connection used to send messages to
* the STOMP broker from within the application, i.e. messages not associated * the STOMP broker from within the application, i.e. messages not associated
* with a specific client session (e.g. REST/HTTP request handling method). * with a specific client session (e.g. REST/HTTP request handling method).
* <p> * <p>By default this is set to "guest".
* By default this is set to "guest".
*/ */
public StompBrokerRelayRegistration setSystemPasscode(String passcode) { public StompBrokerRelayRegistration setSystemPasscode(String passcode) {
Assert.hasText(passcode, "systemPasscode must not be empty"); Assert.hasText(passcode, "systemPasscode must not be empty");
@ -173,7 +168,6 @@ public class StompBrokerRelayRegistration extends AbstractBrokerRegistration {
protected StompBrokerRelayMessageHandler getMessageHandler(SubscribableChannel brokerChannel) { protected StompBrokerRelayMessageHandler getMessageHandler(SubscribableChannel brokerChannel) {
StompBrokerRelayMessageHandler handler = new StompBrokerRelayMessageHandler(getClientInboundChannel(), StompBrokerRelayMessageHandler handler = new StompBrokerRelayMessageHandler(getClientInboundChannel(),
getClientOutboundChannel(), brokerChannel, getDestinationPrefixes()); getClientOutboundChannel(), brokerChannel, getDestinationPrefixes());

21
spring-messaging/src/main/java/org/springframework/messaging/support/AbstractHeaderMapper.java

@ -38,10 +38,11 @@ public abstract class AbstractHeaderMapper<T> implements HeaderMapper<T> {
private String outboundPrefix = ""; private String outboundPrefix = "";
/** /**
* Specify a prefix to be appended to the message header name for any * Specify a prefix to be appended to the message header name for any
* user-defined property that is being mapped into the MessageHeaders. The * user-defined property that is being mapped into the MessageHeaders.
* default is an empty string (no prefix). * The default is an empty String (no prefix).
*/ */
public void setInboundPrefix(String inboundPrefix) { public void setInboundPrefix(String inboundPrefix) {
this.inboundPrefix = (inboundPrefix != null ? inboundPrefix : ""); this.inboundPrefix = (inboundPrefix != null ? inboundPrefix : "");
@ -50,15 +51,16 @@ public abstract class AbstractHeaderMapper<T> implements HeaderMapper<T> {
/** /**
* Specify a prefix to be appended to the protocol property name for any * Specify a prefix to be appended to the protocol property name for any
* user-defined message header that is being mapped into the protocol-specific * user-defined message header that is being mapped into the protocol-specific
* Message. The default is an empty string (no prefix). * Message. The default is an empty String (no prefix).
*/ */
public void setOutboundPrefix(String outboundPrefix) { public void setOutboundPrefix(String outboundPrefix) {
this.outboundPrefix = (outboundPrefix != null ? outboundPrefix : ""); this.outboundPrefix = (outboundPrefix != null ? outboundPrefix : "");
} }
/** /**
* Generate the name to use to set the header defined by the specified {@code headerName} to * Generate the name to use to set the header defined by the specified
* the protocol specific message. * {@code headerName} to the protocol specific message.
* @see #setOutboundPrefix * @see #setOutboundPrefix
*/ */
protected String fromHeaderName(String headerName) { protected String fromHeaderName(String headerName) {
@ -70,8 +72,8 @@ public abstract class AbstractHeaderMapper<T> implements HeaderMapper<T> {
} }
/** /**
* Generate the name to use to set the header defined by the specified {@code propertyName} to * Generate the name to use to set the header defined by the specified
* the {@link MessageHeaders} instance. * {@code propertyName} to the {@link MessageHeaders} instance.
* @see #setInboundPrefix(String) * @see #setInboundPrefix(String)
*/ */
protected String toHeaderName(String propertyName) { protected String toHeaderName(String propertyName) {
@ -83,8 +85,8 @@ public abstract class AbstractHeaderMapper<T> implements HeaderMapper<T> {
} }
/** /**
* Return the header value or {@code null} if it does not exist or does not match * Return the header value, or {@code null} if it does not exist
* the requested {@code type}. * or does not match the requested {@code type}.
*/ */
protected <V> V getHeaderIfAvailable(Map<String, Object> headers, String name, Class<V> type) { protected <V> V getHeaderIfAvailable(Map<String, Object> headers, String name, Class<V> type) {
Object value = headers.get(name); Object value = headers.get(name);
@ -102,4 +104,5 @@ public abstract class AbstractHeaderMapper<T> implements HeaderMapper<T> {
return type.cast(value); return type.cast(value);
} }
} }
} }

23
spring-messaging/src/main/java/org/springframework/messaging/support/ExecutorChannelInterceptor.java

@ -1,3 +1,19 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support; package org.springframework.messaging.support;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
@ -6,8 +22,8 @@ import org.springframework.messaging.MessageHandler;
/** /**
* An extension of {@link ChannelInterceptor} with callbacks to intercept the * An extension of {@link ChannelInterceptor} with callbacks to intercept the
* asynchronous sending of a {@link org.springframework.messaging.Message} to a * asynchronous sending of a {@link org.springframework.messaging.Message} to
* specific subscriber through an {@link java.util.concurrent.Executor}. * a specific subscriber through an {@link java.util.concurrent.Executor}.
* Supported on {@link org.springframework.messaging.MessageChannel} * Supported on {@link org.springframework.messaging.MessageChannel}
* implementations that can be configured with an Executor. * implementations that can be configured with an Executor.
* *
@ -21,7 +37,6 @@ public interface ExecutorChannelInterceptor extends ChannelInterceptor {
* calling the target MessageHandler to handle the message. Allows for * calling the target MessageHandler to handle the message. Allows for
* modification of the Message if necessary or when {@code null} is returned * modification of the Message if necessary or when {@code null} is returned
* the MessageHandler is not invoked. * the MessageHandler is not invoked.
*
* @param message the message to be handled * @param message the message to be handled
* @param channel the channel on which the message was sent to * @param channel the channel on which the message was sent to
* @param handler the target handler to handle the message * @param handler the target handler to handle the message
@ -33,10 +48,8 @@ public interface ExecutorChannelInterceptor extends ChannelInterceptor {
* Invoked inside the {@link Runnable} submitted to the Executor after calling * Invoked inside the {@link Runnable} submitted to the Executor after calling
* the target MessageHandler regardless of the outcome (i.e. Exception raised * the target MessageHandler regardless of the outcome (i.e. Exception raised
* or not) thus allowing for proper resource cleanup. * or not) thus allowing for proper resource cleanup.
*
* <p>Note that this will be invoked only if beforeHandle successfully completed * <p>Note that this will be invoked only if beforeHandle successfully completed
* and returned a Message, i.e. it did not return {@code null}. * and returned a Message, i.e. it did not return {@code null}.
*
* @param message the message handled * @param message the message handled
* @param channel the channel on which the message was sent to * @param channel the channel on which the message was sent to
* @param handler the target handler that handled the message * @param handler the target handler that handled the message

9
spring-messaging/src/main/java/org/springframework/messaging/support/GenericMessage.java

@ -22,6 +22,7 @@ import java.util.Map;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
/** /**
* An implementation of {@link Message} with a generic payload. * An implementation of {@link Message} with a generic payload.
@ -90,12 +91,14 @@ public class GenericMessage<T> implements Message<T>, Serializable {
if (!(other instanceof GenericMessage)) { if (!(other instanceof GenericMessage)) {
return false; return false;
} }
GenericMessage<?> otherMessage = (GenericMessage<?>) other; GenericMessage<?> otherMsg = (GenericMessage<?>) other;
return (this.payload.equals(otherMessage.payload) && this.headers.equals(otherMessage.headers)); // Using nullSafeEquals for proper array equals comparisons
return (ObjectUtils.nullSafeEquals(this.payload, otherMsg.payload) && this.headers.equals(otherMsg.headers));
} }
public int hashCode() { public int hashCode() {
return (this.payload.hashCode() * 23 + this.headers.hashCode()); // Using nullSafeHashCode for proper array hashCode handling
return (ObjectUtils.nullSafeHashCode(this.payload) * 23 + this.headers.hashCode());
} }
public String toString() { public String toString() {

Loading…
Cancel
Save