Browse Source

Jms request/reply operations

This commit updates JmsMessagingTemplate to support the
MessageRequestReplyOperation interface that provides synchronous
request/reply operations.

As JmsMessagingTemplate delegates everything under the scenes to
JmsTemplate, the latter has been updated as well to offer such lower
level operation.

Issue: SPR-12037
pull/602/merge
Stephane Nicoll 11 years ago
parent
commit
b6389a6c66
  1. 74
      spring-jms/src/main/java/org/springframework/jms/core/JmsMessageOperations.java
  2. 88
      spring-jms/src/main/java/org/springframework/jms/core/JmsMessagingTemplate.java
  3. 69
      spring-jms/src/main/java/org/springframework/jms/core/JmsOperations.java
  4. 125
      spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java
  5. 114
      spring-jms/src/test/java/org/springframework/jms/core/JmsMessagingTemplateTests.java
  6. 90
      spring-jms/src/test/java/org/springframework/jms/core/JmsTemplateTests.java
  7. 23
      spring-jms/src/test/java/org/springframework/jms/core/JmsTemplateTransactedTests.java
  8. 16
      spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java
  9. 4
      src/asciidoc/index.adoc

74
spring-jms/src/main/java/org/springframework/jms/core/JmsMessageOperations.java

@ -24,6 +24,7 @@ import org.springframework.messaging.Message; @@ -24,6 +24,7 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.core.MessageReceivingOperations;
import org.springframework.messaging.core.MessageRequestReplyOperations;
import org.springframework.messaging.core.MessageSendingOperations;
/**
@ -35,8 +36,8 @@ import org.springframework.messaging.core.MessageSendingOperations; @@ -35,8 +36,8 @@ import org.springframework.messaging.core.MessageSendingOperations;
* @since 4.1
* @see org.springframework.jms.core.JmsTemplate
*/
public interface JmsMessageOperations
extends MessageSendingOperations<Destination>, MessageReceivingOperations<Destination> {
public interface JmsMessageOperations extends MessageSendingOperations<Destination>,
MessageReceivingOperations<Destination>, MessageRequestReplyOperations<Destination> {
/**
* Send a message to the given destination.
@ -109,4 +110,73 @@ public interface JmsMessageOperations @@ -109,4 +110,73 @@ public interface JmsMessageOperations
*/
<T> T receiveAndConvert(String destinationName, Class<T> targetClass) throws MessagingException;
/**
* Send a request message and receive the reply from the given destination.
* @param destinationName the name of the target destination
* @param requestMessage the message to send
* @return the reply, possibly {@code null} if the message could not be received,
* for example due to a timeout
*/
Message<?> sendAndReceive(String destinationName, Message<?> requestMessage) throws MessagingException;
/**
* Convert the given request Object to serialized form, possibly using a
* {@link org.springframework.messaging.converter.MessageConverter}, send
* it as a {@link Message} to the given destination, receive the reply and convert
* its body of the specified target class.
* @param destinationName the name of the target destination
* @param request payload for the request message to send
* @param targetClass the target type to convert the payload of the reply to
* @return the payload of the reply message, possibly {@code null} if the message
* could not be received, for example due to a timeout
*/
<T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass) throws MessagingException;
/**
* Convert the given request Object to serialized form, possibly using a
* {@link org.springframework.messaging.converter.MessageConverter}, send
* it as a {@link Message} with the given headers, to the specified destination,
* receive the reply and convert its body of the specified target class.
* @param destinationName the name of the target destination
* @param request payload for the request message to send
* @param headers headers for the request message to send
* @param targetClass the target type to convert the payload of the reply to
* @return the payload of the reply message, possibly {@code null} if the message
* could not be received, for example due to a timeout
*/
<T> T convertSendAndReceive(String destinationName, Object request, Map<String, Object> headers, Class<T> targetClass)
throws MessagingException;
/**
* Convert the given request Object to serialized form, possibly using a
* {@link org.springframework.messaging.converter.MessageConverter},
* apply the given post processor and send the resulting {@link Message} to the
* given destination, receive the reply and convert its body of the given
* target class.
* @param destinationName the name of the target destination
* @param request payload for the request message to send
* @param targetClass the target type to convert the payload of the reply to
* @param requestPostProcessor post process to apply to the request message
* @return the payload of the reply message, possibly {@code null} if the message
* could not be received, for example due to a timeout
*/
<T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass,
MessagePostProcessor requestPostProcessor) throws MessagingException;
/**
* Convert the given request Object to serialized form, possibly using a
* {@link org.springframework.messaging.converter.MessageConverter},
* wrap it as a message with the given headers, apply the given post processor
* and send the resulting {@link Message} to the specified destination, receive
* the reply and convert its body of the given target class.
* @param destinationName the name of the target destination
* @param request payload for the request message to send
* @param targetClass the target type to convert the payload of the reply to
* @param requestPostProcessor post process to apply to the request message
* @return the payload of the reply message, possibly {@code null} if the message
* could not be received, for example due to a timeout
*/
<T> T convertSendAndReceive(String destinationName, Object request, Map<String, Object> headers,
Class<T> targetClass, MessagePostProcessor requestPostProcessor) throws MessagingException;
}

88
spring-jms/src/main/java/org/springframework/jms/core/JmsMessagingTemplate.java

@ -28,7 +28,7 @@ import org.springframework.jms.support.converter.MessagingMessageConverter; @@ -28,7 +28,7 @@ import org.springframework.jms.support.converter.MessagingMessageConverter;
import org.springframework.jms.support.converter.SimpleMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.AbstractMessageReceivingTemplate;
import org.springframework.messaging.core.AbstractMessagingTemplate;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.util.Assert;
@ -38,7 +38,7 @@ import org.springframework.util.Assert; @@ -38,7 +38,7 @@ import org.springframework.util.Assert;
* @author Stephane Nicoll
* @since 4.1
*/
public class JmsMessagingTemplate extends AbstractMessageReceivingTemplate<Destination>
public class JmsMessagingTemplate extends AbstractMessagingTemplate<Destination>
implements JmsMessageOperations, InitializingBean {
private JmsTemplate jmsTemplate;
@ -206,13 +206,76 @@ public class JmsMessagingTemplate extends AbstractMessageReceivingTemplate<Desti @@ -206,13 +206,76 @@ public class JmsMessagingTemplate extends AbstractMessageReceivingTemplate<Desti
}
}
@Override
public Message<?> sendAndReceive(Message<?> requestMessage) {
Destination defaultDestination = getDefaultDestination();
if (defaultDestination != null) {
return sendAndReceive(defaultDestination, requestMessage);
}
else {
return sendAndReceive(getRequiredDefaultDestinationName(), requestMessage);
}
}
@Override
public Message<?> sendAndReceive(String destinationName, Message<?> requestMessage)
throws MessagingException {
return doSendAndReceive(destinationName, requestMessage);
}
@Override
public <T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass)
throws MessagingException {
return convertSendAndReceive(destinationName, request, null, targetClass);
}
@Override
public <T> T convertSendAndReceive(Object request, Class<T> targetClass) {
return convertSendAndReceive(request, targetClass, null);
}
@Override
public <T> T convertSendAndReceive(String destinationName, Object request,
Map<String, Object> headers, Class<T> targetClass) throws MessagingException {
return convertSendAndReceive(destinationName, request, headers, targetClass, null);
}
@Override
public <T> T convertSendAndReceive(Object request, Class<T> targetClass, MessagePostProcessor postProcessor) {
Destination defaultDestination = getDefaultDestination();
if (defaultDestination != null) {
return convertSendAndReceive(defaultDestination, request, targetClass, postProcessor);
}
else {
return convertSendAndReceive(getRequiredDefaultDestinationName(), request, targetClass, postProcessor);
}
}
@Override
public <T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass,
MessagePostProcessor requestPostProcessor) throws MessagingException {
return convertSendAndReceive(destinationName, request, null, targetClass, requestPostProcessor);
}
@SuppressWarnings("unchecked")
@Override
public <T> T convertSendAndReceive(String destinationName, Object request, Map<String, Object> headers,
Class<T> targetClass, MessagePostProcessor postProcessor) {
Message<?> requestMessage = doConvert(request, headers, postProcessor);
Message<?> replyMessage = sendAndReceive(destinationName, requestMessage);
return (replyMessage != null ? (T) getMessageConverter().fromMessage(replyMessage, targetClass) : null);
}
@Override
protected void doSend(Destination destination, Message<?> message) {
this.jmsTemplate.send(destination, new MessagingMessageCreator(message, this.jmsMessageConverter));
this.jmsTemplate.send(destination, createMessageCreator(message));
}
protected void doSend(String destinationName, Message<?> message) {
this.jmsTemplate.send(destinationName, new MessagingMessageCreator(message, this.jmsMessageConverter));
this.jmsTemplate.send(destinationName, createMessageCreator(message));
}
@Override
@ -226,6 +289,23 @@ public class JmsMessagingTemplate extends AbstractMessageReceivingTemplate<Desti @@ -226,6 +289,23 @@ public class JmsMessagingTemplate extends AbstractMessageReceivingTemplate<Desti
return doConvert(jmsMessage);
}
@Override
protected Message<?> doSendAndReceive(Destination destination, Message<?> requestMessage) {
javax.jms.Message jmsMessage = this.jmsTemplate
.sendAndReceive(destination, createMessageCreator(requestMessage));
return doConvert(jmsMessage);
}
protected Message<?> doSendAndReceive(String destinationName, Message<?> requestMessage) {
javax.jms.Message jmsMessage = this.jmsTemplate
.sendAndReceive(destinationName, createMessageCreator(requestMessage));
return doConvert(jmsMessage);
}
private MessagingMessageCreator createMessageCreator(Message<?> message) {
return new MessagingMessageCreator(message, this.jmsMessageConverter);
}
protected String getRequiredDefaultDestinationName() {
String name = getDefaultDestinationName();
if (name == null) {

69
spring-jms/src/main/java/org/springframework/jms/core/JmsOperations.java

@ -32,8 +32,12 @@ import org.springframework.jms.JmsException; @@ -32,8 +32,12 @@ import org.springframework.jms.JmsException;
* {@code receive(..)} methods that mirror various JMS API methods.
* See the JMS specification and javadocs for details on those methods.
*
* <p>Provides also basic request reply operation using a temporary
* queue to collect the reply.
*
* @author Mark Pollack
* @author Juergen Hoeller
* @author Stephane Nicoll
* @since 1.1
* @see JmsTemplate
* @see javax.jms.Destination
@ -83,9 +87,9 @@ public interface JmsOperations { @@ -83,9 +87,9 @@ public interface JmsOperations {
<T> T execute(String destinationName, ProducerCallback<T> action) throws JmsException;
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
// Convenience methods for sending messages
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
/**
* Send a message to the default destination.
@ -115,9 +119,9 @@ public interface JmsOperations { @@ -115,9 +119,9 @@ public interface JmsOperations {
void send(String destinationName, MessageCreator messageCreator) throws JmsException;
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
// Convenience methods for sending auto-converted messages
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
/**
* Send the given object to the default destination, converting the object
@ -185,9 +189,9 @@ public interface JmsOperations { @@ -185,9 +189,9 @@ public interface JmsOperations {
throws JmsException;
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
// Convenience methods for receiving messages
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
/**
* Receive a message synchronously from the default destination, but only
@ -264,9 +268,9 @@ public interface JmsOperations { @@ -264,9 +268,9 @@ public interface JmsOperations {
Message receiveSelected(String destinationName, String messageSelector) throws JmsException;
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
// Convenience methods for receiving auto-converted messages
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
/**
* Receive a message synchronously from the default destination, but only
@ -349,9 +353,54 @@ public interface JmsOperations { @@ -349,9 +353,54 @@ public interface JmsOperations {
Object receiveSelectedAndConvert(String destinationName, String messageSelector) throws JmsException;
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
// Convenience methods for sending messages to and receiving the reply from a destination
//---------------------------------------------------------------------------------------
/**
* Send a request message and receive the reply from a default destination. The
* {@link MessageCreator} callback creates the message given a Session. A temporary
* queue is created as part of this operation and is set in the {@code JMSReplyTO}
* header of the message.
* <p>This will only work with a default destination specified!
* @param messageCreator callback to create a request message
* @return the reply, possibly {@code null} if the message could not be received,
* for example due to a timeout
* @throws JmsException checked JMSException converted to unchecked
*/
Message sendAndReceive(MessageCreator messageCreator) throws JmsException;
/**
* Send a message and receive the reply from the specified destination. The
* {@link MessageCreator} callback creates the message given a Session. A temporary
* queue is created as part of this operation and is set in the {@code JMSReplyTO}
* header of the message.
* @param destination the destination to send this message to
* @param messageCreator callback to create a message
* @return the reply, possibly {@code null} if the message could not be received,
* for example due to a timeout
* @throws JmsException checked JMSException converted to unchecked
*/
Message sendAndReceive(Destination destination, MessageCreator messageCreator) throws JmsException;
/**
* Send a message and receive the reply from the specified destination. The
* {@link MessageCreator} callback creates the message given a Session. A temporary
* queue is created as part of this operation and is set in the {@code JMSReplyTO}
* header of the message.
* @param destinationName the name of the destination to send this message to
* (to be resolved to an actual destination by a DestinationResolver)
* @param messageCreator callback to create a message
* @return the reply, possibly {@code null} if the message could not be received,
* for example due to a timeout
* @throws JmsException checked JMSException converted to unchecked
*/
Message sendAndReceive(String destinationName, MessageCreator messageCreator) throws JmsException;
//---------------------------------------------------------------------------------------
// Convenience methods for browsing messages
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
/**
* Browse messages in the default JMS queue. The callback gives access to the JMS

125
spring-jms/src/main/java/org/springframework/jms/core/JmsTemplate.java

@ -28,6 +28,7 @@ import javax.jms.MessageProducer; @@ -28,6 +28,7 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import org.springframework.jms.JmsException;
import org.springframework.jms.connection.ConnectionFactoryUtils;
@ -77,6 +78,7 @@ import org.springframework.util.ReflectionUtils; @@ -77,6 +78,7 @@ import org.springframework.util.ReflectionUtils;
*
* @author Mark Pollack
* @author Juergen Hoeller
* @author Stephane Nicoll
* @since 1.1
* @see #setConnectionFactory
* @see #setPubSubDomain
@ -447,9 +449,9 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations @@ -447,9 +449,9 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations
}
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
// JmsOperations execute methods
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
@Override
public <T> T execute(SessionCallback<T> action) throws JmsException {
@ -546,9 +548,9 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations @@ -546,9 +548,9 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations
}
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
// Convenience methods for sending messages
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
@Override
public void send(MessageCreator messageCreator) throws JmsException {
@ -635,9 +637,9 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations @@ -635,9 +637,9 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations
}
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
// Convenience methods for sending auto-converted messages
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
@Override
public void convertAndSend(Object message) throws JmsException {
@ -710,9 +712,9 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations @@ -710,9 +712,9 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations
}
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
// Convenience methods for receiving messages
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
@Override
public Message receive() throws JmsException {
@ -838,9 +840,9 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations @@ -838,9 +840,9 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations
}
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
// Convenience methods for receiving auto-converted messages
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
@Override
public Object receiveAndConvert() throws JmsException {
@ -890,9 +892,108 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations @@ -890,9 +892,108 @@ public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations
}
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
// Convenience methods for sending messages to and receiving the reply from a destination
//---------------------------------------------------------------------------------------
@Override
public Message sendAndReceive(MessageCreator messageCreator) throws JmsException {
Destination defaultDestination = getDefaultDestination();
if (defaultDestination != null) {
return sendAndReceive(defaultDestination, messageCreator);
}
else {
return sendAndReceive(getRequiredDefaultDestinationName(), messageCreator);
}
}
@Override
public Message sendAndReceive(final Destination destination, final MessageCreator messageCreator) throws JmsException {
return executeLocal(new SessionCallback<Message>() {
@Override
public Message doInJms(Session session) throws JMSException {
return doSendAndReceive(session, destination, messageCreator);
}
}, true);
}
@Override
public Message sendAndReceive(final String destinationName, final MessageCreator messageCreator) throws JmsException {
return executeLocal(new SessionCallback<Message>() {
@Override
public Message doInJms(Session session) throws JMSException {
Destination destination = resolveDestinationName(session, destinationName);
return doSendAndReceive(session, destination, messageCreator);
}
}, true);
}
/**
* Send a request message to the given {@link Destination} and block until
* a reply has been received on a temporary queue created on-the-fly.
* <p>Return the response message or {@code null} if no message has
* @throws JMSException if thrown by JMS API methods
*/
protected Message doSendAndReceive(Session session, Destination destination, MessageCreator messageCreator)
throws JMSException {
Assert.notNull(messageCreator, "MessageCreator must not be null");
TemporaryQueue responseQueue = null;
MessageProducer producer = null;
MessageConsumer consumer = null;
try {
Message requestMessage = messageCreator.createMessage(session);
responseQueue = session.createTemporaryQueue();
producer = session.createProducer(destination);
consumer = session.createConsumer(responseQueue);
requestMessage.setJMSReplyTo(responseQueue);
if (logger.isDebugEnabled()) {
logger.debug("Sending created message: " + requestMessage);
}
doSend(producer, requestMessage);
return doReceive(consumer, getReceiveTimeout());
}
finally {
JmsUtils.closeMessageConsumer(consumer);
JmsUtils.closeMessageProducer(producer);
if (responseQueue != null) {
responseQueue.delete();
}
}
}
/**
* A variant of {@link #execute(SessionCallback, boolean)} that explicitly
* creates a non transactional session. The given {@link SessionCallback}
* does not participate in an existing transaction.
*/
private <T> T executeLocal(SessionCallback<T> action, boolean startConnection) throws JmsException {
Assert.notNull(action, "Callback object must not be null");
Connection con = null;
Session session = null;
try {
con = getConnectionFactory().createConnection();
session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
if (startConnection) {
con.start();
}
if (logger.isDebugEnabled()) {
logger.debug("Executing callback on JMS Session: " + session);
}
return action.doInJms(session);
}
catch (JMSException ex) {
throw convertJmsAccessException(ex);
}
finally {
JmsUtils.closeSession(session);
ConnectionFactoryUtils.releaseConnection(con, getConnectionFactory(), startConnection);
}
}
//---------------------------------------------------------------------------------------
// Convenience methods for browsing messages
//-------------------------------------------------------------------------
//---------------------------------------------------------------------------------------
@Override
public <T> T browse(BrowserCallback<T> action) throws JmsException {

114
spring-jms/src/test/java/org/springframework/jms/core/JmsMessagingTemplateTests.java

@ -337,10 +337,120 @@ public class JmsMessagingTemplateTests { @@ -337,10 +337,120 @@ public class JmsMessagingTemplateTests {
assertNull(messagingTemplate.receiveAndConvert("myQueue", String.class));
}
@Test
public void sendAndReceive() {
Destination destination = new Destination() {};
Message<String> request = createTextMessage();
javax.jms.Message replyJmsMessage = createJmsTextMessage();
given(jmsTemplate.sendAndReceive(eq(destination), anyObject())).willReturn(replyJmsMessage);
private Message<String> createTextMessage() {
Message<?> actual = messagingTemplate.sendAndReceive(destination, request);
verify(jmsTemplate, times(1)).sendAndReceive(eq(destination), anyObject());
assertTextMessage(actual);
}
@Test
public void sendAndReceiveName() {
Message<String> request = createTextMessage();
javax.jms.Message replyJmsMessage = createJmsTextMessage();
given(jmsTemplate.sendAndReceive(eq("myQueue"), anyObject())).willReturn(replyJmsMessage);
Message<?> actual = messagingTemplate.sendAndReceive("myQueue", request);
verify(jmsTemplate, times(1)).sendAndReceive(eq("myQueue"), anyObject());
assertTextMessage(actual);
}
@Test
public void sendAndReceiveDefaultDestination() {
Destination destination = new Destination() {};
messagingTemplate.setDefaultDestination(destination);
Message<String> request = createTextMessage();
javax.jms.Message replyJmsMessage = createJmsTextMessage();
given(jmsTemplate.sendAndReceive(eq(destination), anyObject())).willReturn(replyJmsMessage);
Message<?> actual = messagingTemplate.sendAndReceive(request);
verify(jmsTemplate, times(1)).sendAndReceive(eq(destination), anyObject());
assertTextMessage(actual);
}
@Test
public void sendAndReceiveDefaultDestinationName() {
messagingTemplate.setDefaultDestinationName("myQueue");
Message<String> request = createTextMessage();
javax.jms.Message replyJmsMessage = createJmsTextMessage();
given(jmsTemplate.sendAndReceive(eq("myQueue"), anyObject())).willReturn(replyJmsMessage);
Message<?> actual = messagingTemplate.sendAndReceive(request);
verify(jmsTemplate, times(1)).sendAndReceive(eq("myQueue"), anyObject());
assertTextMessage(actual);
}
@Test
public void sendAndReceiveNoDefaultSet() {
Message<String> message = createTextMessage();
thrown.expect(IllegalStateException.class);
messagingTemplate.sendAndReceive(message);
}
@Test
public void convertSendAndReceivePayload() throws JMSException {
Destination destination = new Destination() {};
javax.jms.Message replyJmsMessage = createJmsTextMessage("My reply");
given(jmsTemplate.sendAndReceive(eq(destination), anyObject())).willReturn(replyJmsMessage);
String reply = messagingTemplate.convertSendAndReceive(destination, "my Payload", String.class);
verify(jmsTemplate, times(1)).sendAndReceive(eq(destination), anyObject());
assertEquals("My reply", reply);
}
@Test
public void convertSendAndReceivePayloadName() throws JMSException {
javax.jms.Message replyJmsMessage = createJmsTextMessage("My reply");
given(jmsTemplate.sendAndReceive(eq("myQueue"), anyObject())).willReturn(replyJmsMessage);
String reply = messagingTemplate.convertSendAndReceive("myQueue", "my Payload", String.class);
verify(jmsTemplate, times(1)).sendAndReceive(eq("myQueue"), anyObject());
assertEquals("My reply", reply);
}
@Test
public void convertSendAndReceiveDefaultDestination() throws JMSException {
Destination destination = new Destination() {};
messagingTemplate.setDefaultDestination(destination);
javax.jms.Message replyJmsMessage = createJmsTextMessage("My reply");
given(jmsTemplate.sendAndReceive(eq(destination), anyObject())).willReturn(replyJmsMessage);
String reply = messagingTemplate.convertSendAndReceive("my Payload", String.class);
verify(jmsTemplate, times(1)).sendAndReceive(eq(destination), anyObject());
assertEquals("My reply", reply);
}
@Test
public void convertSendAndReceiveDefaultDestinationName() throws JMSException {
messagingTemplate.setDefaultDestinationName("myQueue");
javax.jms.Message replyJmsMessage = createJmsTextMessage("My reply");
given(jmsTemplate.sendAndReceive(eq("myQueue"), anyObject())).willReturn(replyJmsMessage);
String reply = messagingTemplate.convertSendAndReceive("my Payload", String.class);
verify(jmsTemplate, times(1)).sendAndReceive(eq("myQueue"), anyObject());
assertEquals("My reply", reply);
}
@Test
public void convertSendAndReceiveNoDefaultSet() throws JMSException {
thrown.expect(IllegalStateException.class);
messagingTemplate.convertSendAndReceive("my Payload", String.class);
}
private Message<String> createTextMessage(String payload) {
return MessageBuilder
.withPayload("Hello").setHeader("foo", "bar").build();
.withPayload(payload).setHeader("foo", "bar").build();
}
private Message<String> createTextMessage() {
return createTextMessage("Hello");
}
private javax.jms.Message createJmsTextMessage(String payload) {

90
spring-jms/src/test/java/org/springframework/jms/core/JmsTemplateTests.java

@ -30,6 +30,7 @@ import javax.jms.MessageConsumer; @@ -30,6 +30,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import javax.naming.Context;
@ -66,12 +67,13 @@ import static org.mockito.BDDMockito.*; @@ -66,12 +67,13 @@ import static org.mockito.BDDMockito.*;
*
* @author Andre Biryukov
* @author Mark Pollack
* @author Stephane Nicoll
*/
public class JmsTemplateTests {
private Context jndiContext;
private ConnectionFactory connectionFactory;
private Connection connection;
protected Connection connection;
private Session session;
private Destination queue;
@ -120,6 +122,9 @@ public class JmsTemplateTests { @@ -120,6 +122,9 @@ public class JmsTemplateTests {
return false;
}
protected Session getLocalSession() {
return session;
}
@Test
public void testExceptionStackTrace() {
@ -630,6 +635,89 @@ public class JmsTemplateTests { @@ -630,6 +635,89 @@ public class JmsTemplateTests {
verify(messageConsumer).close();
}
@Test
public void testSendAndReceiveDefaultDestination() throws Exception {
doTestSendAndReceive(true, true, 1000L);
}
@Test
public void testSendAndReceiveDefaultDestinationName() throws Exception {
doTestSendAndReceive(false, true, 1000L);
}
@Test
public void testSendAndReceiveDestination() throws Exception {
doTestSendAndReceive(true, false, 1000L);
}
@Test
public void testSendAndReceiveDestinationName() throws Exception {
doTestSendAndReceive(false, false, 1000L);
}
private void doTestSendAndReceive(boolean explicitDestination, boolean useDefaultDestination, long timeout)
throws Exception {
JmsTemplate template = createTemplate();
template.setConnectionFactory(connectionFactory);
String destinationName = "testDestination";
if (useDefaultDestination) {
if (explicitDestination) {
template.setDefaultDestination(queue);
}
else {
template.setDefaultDestinationName(destinationName);
}
}
template.setReceiveTimeout(timeout);
Session localSession = getLocalSession();
TemporaryQueue replyDestination = mock(TemporaryQueue.class);
MessageProducer messageProducer = mock(MessageProducer.class);
given(localSession.createProducer(queue)).willReturn(messageProducer);
given(localSession.createTemporaryQueue()).willReturn(replyDestination);
MessageConsumer messageConsumer = mock(MessageConsumer.class);
given(localSession.createConsumer(replyDestination)).willReturn(messageConsumer);
TextMessage request = mock(TextMessage.class);
MessageCreator messageCreator = mock(MessageCreator.class);
given(messageCreator.createMessage(localSession)).willReturn(request);
TextMessage reply = mock(TextMessage.class);
if (timeout == JmsTemplate.RECEIVE_TIMEOUT_NO_WAIT) {
given(messageConsumer.receiveNoWait()).willReturn(reply);
}
else if (timeout == JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT) {
given(messageConsumer.receive()).willReturn(reply);
}
else {
given(messageConsumer.receive(timeout)).willReturn(reply);
}
Message message = null;
if (useDefaultDestination) {
message = template.sendAndReceive(messageCreator);
}
else if (explicitDestination) {
message = template.sendAndReceive(queue, messageCreator);
}
else {
message = template.sendAndReceive(destinationName, messageCreator);
}
// replyTO set on the request
verify(request).setJMSReplyTo(replyDestination);
assertSame("Reply message not received", reply, message);
verify(connection).start();
verify(connection).close();
verify(localSession).close();
verify(messageConsumer).close();
verify(messageProducer).close();
}
@Test
public void testIllegalStateException() throws Exception {
doTestJmsException(new javax.jms.IllegalStateException(""), org.springframework.jms.IllegalStateException.class);

23
spring-jms/src/test/java/org/springframework/jms/core/JmsTemplateTransactedTests.java

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@ -16,12 +16,33 @@ @@ -16,12 +16,33 @@
package org.springframework.jms.core;
import javax.jms.Session;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
/**
* @author Juergen Hoeller
* @author Stephane Nicoll
* @since 06.01.2005
*/
public class JmsTemplateTransactedTests extends JmsTemplateTests {
private Session localSession;
@Override
public void setupMocks() throws Exception {
super.setupMocks();
this.localSession = mock(Session.class);
given(this.connection.createSession(false,
Session.AUTO_ACKNOWLEDGE)).willReturn(this.localSession);
}
@Override
protected Session getLocalSession() {
return this.localSession;
}
@Override
protected boolean useTransactedSession() {
return true;

16
spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java

@ -28,6 +28,7 @@ import org.springframework.messaging.converter.MessageConversionException; @@ -28,6 +28,7 @@ import org.springframework.messaging.converter.MessageConversionException;
*
* @author Mark Fisher
* @author Rossen Stoyanchev
* @author Stephane Nicoll
* @since 4.0
*/
public abstract class AbstractMessagingTemplate<D> extends AbstractMessageReceivingTemplate<D>
@ -76,20 +77,7 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageReceiv @@ -76,20 +77,7 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageReceiv
public <T> T convertSendAndReceive(D destination, Object request, Map<String, Object> headers,
Class<T> targetClass, MessagePostProcessor postProcessor) {
MessageHeaders messageHeaders = (headers != null ? new MessageHeaders(headers) : null);
Message<?> requestMessage = getMessageConverter().toMessage(request, messageHeaders);
if (requestMessage == null) {
String payloadType = (request != null ? request.getClass().getName() : null);
Object contentType = (messageHeaders != null ? messageHeaders.get(MessageHeaders.CONTENT_TYPE) : null);
throw new MessageConversionException("Unable to convert payload with type '" + payloadType +
"', contentType='" + contentType + "', converter=[" + getMessageConverter() + "]");
}
if (postProcessor != null) {
requestMessage = postProcessor.postProcessMessage(requestMessage);
}
Message<?> requestMessage = doConvert(request, headers, postProcessor);
Message<?> replyMessage = sendAndReceive(destination, requestMessage);
return (replyMessage != null ? (T) getMessageConverter().fromMessage(replyMessage, targetClass) : null);
}

4
src/asciidoc/index.adoc

@ -40722,6 +40722,10 @@ to provide consistent management of QOS values, the `JmsTemplate` must therefore @@ -40722,6 +40722,10 @@ to provide consistent management of QOS values, the `JmsTemplate` must therefore
specifically enabled to use its own QOS values by setting the boolean property
`isExplicitQosEnabled` to `true`.
For convenience, `JmsTemplate` also exposes a basic request-reply operation that allows
to send a message and wait for a reply on a temporary queue that is created as part of
the operation.
[NOTE]
====
Instances of the `JmsTemplate` class are __thread-safe once configured__. This is

Loading…
Cancel
Save