Browse Source

Revisit JmsMessagingTemplate

This commit revisits JmsMessagingTemplate and adds support for
receiving operations as well. JmsMessageSendingOperations has been
renamed to JmsMessageOperations.

The messaging abstraction did not split receiving and request-reply
operations. AbstractMessageReceivingTemplate has been created to hold
only the receiving operations.

Issue: SPR-11772
pull/547/head
Stephane Nicoll 11 years ago
parent
commit
9fabcad3dd
  1. 26
      spring-jms/src/main/java/org/springframework/jms/messaging/JmsMessageOperations.java
  2. 71
      spring-jms/src/main/java/org/springframework/jms/messaging/JmsMessagingTemplate.java
  3. 3
      spring-jms/src/main/java/org/springframework/jms/support/converter/MessagingMessageConverter.java
  4. 152
      spring-jms/src/test/java/org/springframework/jms/messaging/JmsMessagingTemplateTests.java
  5. 5
      spring-jms/src/test/java/org/springframework/jms/support/converter/MessagingMessageConverterTests.java
  6. 63
      spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessageReceivingTemplate.java
  7. 40
      spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java

26
spring-jms/src/main/java/org/springframework/jms/messaging/JmsMessageSendingOperations.java → spring-jms/src/main/java/org/springframework/jms/messaging/JmsMessageOperations.java

@ -23,18 +23,20 @@ import javax.jms.Destination;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.MessagePostProcessor; import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.core.MessageReceivingOperations;
import org.springframework.messaging.core.MessageSendingOperations; import org.springframework.messaging.core.MessageSendingOperations;
/** /**
* A specialization of {@link MessageSendingOperations} for JMS related * A specialization of {@link MessageSendingOperations} and {@link MessageSendingOperations}
* operations that allows to specify a destination name rather than the * for JMS related operations that allows to specify a destination name rather than the
* actual {@link javax.jms.Destination} * actual {@link javax.jms.Destination}
* *
* @author Stephane Nicoll * @author Stephane Nicoll
* @since 4.1 * @since 4.1
* @see org.springframework.jms.core.JmsTemplate * @see org.springframework.jms.core.JmsTemplate
*/ */
public interface JmsMessageSendingOperations extends MessageSendingOperations<Destination> { public interface JmsMessageOperations
extends MessageSendingOperations<Destination>, MessageReceivingOperations<Destination> {
/** /**
* Send a message to the given destination. * Send a message to the given destination.
@ -89,4 +91,22 @@ public interface JmsMessageSendingOperations extends MessageSendingOperations<De
void convertAndSend(String destinationName, Object payload, Map<String, void convertAndSend(String destinationName, Object payload, Map<String,
Object> headers, MessagePostProcessor postProcessor) throws MessagingException; Object> headers, MessagePostProcessor postProcessor) throws MessagingException;
/**
* Receive a message from the given destination.
* @param destinationName the name of the target destination
* @return the received message, possibly {@code null} if the message could not
* be received, for example due to a timeout
*/
Message<?> receive(String destinationName) throws MessagingException;
/**
* Receive a message from the given destination and convert its payload to the
* specified target class.
* @param destinationName the name of the target destination
* @param targetClass the target class to convert the payload to
* @return the converted payload of the reply message, possibly {@code null} if
* the message could not be received, for example due to a timeout
*/
<T> T receiveAndConvert(String destinationName, Class<T> targetClass) throws MessagingException;
} }

71
spring-jms/src/main/java/org/springframework/jms/messaging/JmsMessagingTemplate.java

@ -25,25 +25,26 @@ import javax.jms.Session;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator; import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter; import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessagingMessageConverter; import org.springframework.jms.support.converter.MessagingMessageConverter;
import org.springframework.jms.support.converter.SimpleJmsHeaderMapper; import org.springframework.jms.support.converter.SimpleJmsHeaderMapper;
import org.springframework.jms.support.converter.SimpleMessageConverter; import org.springframework.jms.support.converter.SimpleMessageConverter;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
import org.springframework.messaging.core.AbstractMessageSendingTemplate; import org.springframework.messaging.core.AbstractMessageReceivingTemplate;
import org.springframework.messaging.core.MessagePostProcessor; import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
* An implementation of {@link JmsMessageSendingOperations}. * An implementation of {@link JmsMessageOperations}.
* *
* @author Stephane Nicoll * @author Stephane Nicoll
* @since 4.1 * @since 4.1
*/ */
public class JmsMessagingTemplate public class JmsMessagingTemplate
extends AbstractMessageSendingTemplate<Destination> extends AbstractMessageReceivingTemplate<Destination>
implements JmsMessageSendingOperations, InitializingBean { implements JmsMessageOperations, InitializingBean {
private JmsTemplate jmsTemplate; private JmsTemplate jmsTemplate;
@ -165,6 +166,45 @@ public class JmsMessagingTemplate
send(destinationName, message); send(destinationName, message);
} }
@Override
public Message<?> receive() {
Destination defaultDestination = getDefaultDestination();
if (defaultDestination != null) {
return receive(defaultDestination);
}
else {
return receive(getRequiredDefaultDestinationName());
}
}
@Override
public <T> T receiveAndConvert(Class<T> targetClass) {
Destination defaultDestination = getDefaultDestination();
if (defaultDestination != null) {
return receiveAndConvert(defaultDestination, targetClass);
}
else {
return receiveAndConvert(getRequiredDefaultDestinationName(), targetClass);
}
}
@Override
public Message<?> receive(String destinationName) throws MessagingException {
return doReceive(destinationName);
}
@Override
@SuppressWarnings("unchecked")
public <T> T receiveAndConvert(String destinationName, Class<T> targetClass) throws MessagingException {
Message<?> message = doReceive(destinationName);
if (message != null) {
return (T) getMessageConverter().fromMessage(message, targetClass);
}
else {
return null;
}
}
@Override @Override
protected void doSend(Destination destination, Message<?> message) { protected void doSend(Destination destination, Message<?> message) {
jmsTemplate.send(destination, new MessagingMessageCreator(message, this.jmsMessageConverter)); jmsTemplate.send(destination, new MessagingMessageCreator(message, this.jmsMessageConverter));
@ -174,6 +214,17 @@ public class JmsMessagingTemplate
jmsTemplate.send(destinationName, new MessagingMessageCreator(message, this.jmsMessageConverter)); jmsTemplate.send(destinationName, new MessagingMessageCreator(message, this.jmsMessageConverter));
} }
@Override
protected Message<?> doReceive(Destination destination) {
javax.jms.Message jmsMessage = jmsTemplate.receive(destination);
return doConvert(jmsMessage);
}
protected Message<?> doReceive(String destinationName) {
javax.jms.Message jmsMessage = jmsTemplate.receive(destinationName);
return doConvert(jmsMessage);
}
protected String getRequiredDefaultDestinationName() { protected String getRequiredDefaultDestinationName() {
String name = getDefaultDestinationName(); String name = getDefaultDestinationName();
if (name == null) { if (name == null) {
@ -185,6 +236,18 @@ public class JmsMessagingTemplate
return name; return name;
} }
protected Message<?> doConvert(javax.jms.Message message) {
if (message == null) {
return null;
}
try {
return (Message<?>) jmsMessageConverter.fromMessage(message);
}
catch (JMSException e) {
throw new MessageConversionException("Could not convert '" + message + "'", e);
}
}
private static class MessagingMessageCreator implements MessageCreator { private static class MessagingMessageCreator implements MessageCreator {

3
spring-jms/src/main/java/org/springframework/jms/support/converter/MessagingMessageConverter.java

@ -98,6 +98,9 @@ public class MessagingMessageConverter implements MessageConverter, Initializing
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public Object fromMessage(javax.jms.Message message) throws JMSException, MessageConversionException { public Object fromMessage(javax.jms.Message message) throws JMSException, MessageConversionException {
if (message == null) {
return null;
}
Map<String, Object> mappedHeaders = this.headerMapper.toHeaders(message); Map<String, Object> mappedHeaders = this.headerMapper.toHeaders(message);
Object convertedObject = extractPayload(message); Object convertedObject = extractPayload(message);
MessageBuilder<Object> builder = (convertedObject instanceof org.springframework.messaging.Message) ? MessageBuilder<Object> builder = (convertedObject instanceof org.springframework.messaging.Message) ?

152
spring-jms/src/test/java/org/springframework/jms/messaging/JmsMessagingTemplateTests.java

@ -17,9 +17,13 @@
package org.springframework.jms.messaging; package org.springframework.jms.messaging;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.mockito.BDDMockito.any;
import static org.mockito.BDDMockito.eq;
import static org.mockito.BDDMockito.*; import static org.mockito.BDDMockito.*;
import static org.mockito.BDDMockito.verify;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import java.io.Writer;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -29,6 +33,7 @@ import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException; import org.junit.rules.ExpectedException;
@ -39,12 +44,14 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.jms.StubTextMessage; import org.springframework.jms.StubTextMessage;
import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator; import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.support.converter.MessageConversionException; import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.SimpleMessageConverter; import org.springframework.jms.support.converter.SimpleMessageConverter;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
/** /**
@ -216,12 +223,151 @@ public class JmsMessagingTemplateTests {
assertTextMessage(messageCreator.getValue()); // see createTextMessage assertTextMessage(messageCreator.getValue()); // see createTextMessage
} }
@Test
public void receive() {
Destination destination = new Destination() {};
javax.jms.Message jmsMessage = createJmsTextMessage();
given(jmsTemplate.receive(destination)).willReturn(jmsMessage);
Message<?> message = messagingTemplate.receive(destination);
verify(jmsTemplate).receive(destination);
assertTextMessage(message);
}
@Test
public void receiveName() {
javax.jms.Message jmsMessage = createJmsTextMessage();
given(jmsTemplate.receive("myQueue")).willReturn(jmsMessage);
Message<?> message = messagingTemplate.receive("myQueue");
verify(jmsTemplate).receive("myQueue");
assertTextMessage(message);
}
@Test
public void receiveDefaultDestination() {
Destination destination = new Destination() {};
messagingTemplate.setDefaultDestination(destination);
javax.jms.Message jmsMessage = createJmsTextMessage();
given(jmsTemplate.receive(destination)).willReturn(jmsMessage);
Message<?> message = messagingTemplate.receive();
verify(jmsTemplate).receive(destination);
assertTextMessage(message);
}
@Test
public void receiveDefaultDestinationName() {
messagingTemplate.setDefaultDestinationName("myQueue");
javax.jms.Message jmsMessage = createJmsTextMessage();
given(jmsTemplate.receive("myQueue")).willReturn(jmsMessage);
Message<?> message = messagingTemplate.receive();
verify(jmsTemplate).receive("myQueue");
assertTextMessage(message);
}
@Test
public void receiveNoDefaultSet() {
thrown.expect(IllegalStateException.class);
messagingTemplate.receive();
}
@Test
public void receiveAndConvert() {
Destination destination = new Destination() {};
javax.jms.Message jmsMessage = createJmsTextMessage("my Payload");
given(jmsTemplate.receive(destination)).willReturn(jmsMessage);
String payload = messagingTemplate.receiveAndConvert(destination, String.class);
assertEquals("my Payload", payload);
verify(jmsTemplate).receive(destination);
}
@Test
public void receiveAndConvertName() {
javax.jms.Message jmsMessage = createJmsTextMessage("my Payload");
given(jmsTemplate.receive("myQueue")).willReturn(jmsMessage);
String payload = messagingTemplate.receiveAndConvert("myQueue", String.class);
assertEquals("my Payload", payload);
verify(jmsTemplate).receive("myQueue");
}
@Test
public void receiveAndConvertDefaultDestination() {
Destination destination = new Destination() {};
messagingTemplate.setDefaultDestination(destination);
javax.jms.Message jmsMessage = createJmsTextMessage("my Payload");
given(jmsTemplate.receive(destination)).willReturn(jmsMessage);
String payload = messagingTemplate.receiveAndConvert(String.class);
assertEquals("my Payload", payload);
verify(jmsTemplate).receive(destination);
}
@Test
public void receiveAndConvertDefaultDestinationName() {
messagingTemplate.setDefaultDestinationName("myQueue");
javax.jms.Message jmsMessage = createJmsTextMessage("my Payload");
given(jmsTemplate.receive("myQueue")).willReturn(jmsMessage);
String payload = messagingTemplate.receiveAndConvert(String.class);
assertEquals("my Payload", payload);
verify(jmsTemplate).receive("myQueue");
}
@Test
public void receiveAndConvertWithConversion() {
javax.jms.Message jmsMessage = createJmsTextMessage("123");
given(jmsTemplate.receive("myQueue")).willReturn(jmsMessage);
messagingTemplate.setMessageConverter(new GenericMessageConverter(new DefaultConversionService()));
Integer payload = messagingTemplate.receiveAndConvert("myQueue", Integer.class);
assertEquals(Integer.valueOf(123), payload);
verify(jmsTemplate).receive("myQueue");
}
@Test
@Ignore("SPR-11817")
public void receiveAndConvertNoConverter() {
javax.jms.Message jmsMessage = createJmsTextMessage("Hello");
given(jmsTemplate.receive("myQueue")).willReturn(jmsMessage);
thrown.expect(MessageConversionException.class);
messagingTemplate.receiveAndConvert("myQueue", Writer.class);
}
@Test
public void receiveAndConvertNoInput() {
given(jmsTemplate.receive("myQueue")).willReturn(null);
assertNull(messagingTemplate.receiveAndConvert("myQueue", String.class));
}
private Message<String> createTextMessage() { private Message<String> createTextMessage() {
return MessageBuilder return MessageBuilder
.withPayload("Hello").setHeader("foo", "bar").build(); .withPayload("Hello").setHeader("foo", "bar").build();
} }
private javax.jms.Message createJmsTextMessage(String payload) {
try {
StubTextMessage jmsMessage = new StubTextMessage(payload);
jmsMessage.setStringProperty("foo", "bar");
return jmsMessage;
}
catch (JMSException e) {
throw new IllegalStateException("Should not happen", e);
}
}
private javax.jms.Message createJmsTextMessage() {
return createJmsTextMessage("Hello");
}
private void assertTextMessage(MessageCreator messageCreator) { private void assertTextMessage(MessageCreator messageCreator) {
try { try {
TextMessage jmsMessage = createTextMessage(messageCreator); TextMessage jmsMessage = createTextMessage(messageCreator);
@ -233,6 +379,12 @@ public class JmsMessagingTemplateTests {
} }
} }
private void assertTextMessage(Message<?> message) {
assertNotNull("message should not be null", message);
assertEquals("Wrong payload", "Hello", message.getPayload());
assertEquals("Invalid foo property", "bar", message.getHeaders().get("foo"));
}
protected TextMessage createTextMessage(MessageCreator creator) throws JMSException { protected TextMessage createTextMessage(MessageCreator creator) throws JMSException {
Session mock = mock(Session.class); Session mock = mock(Session.class);

5
spring-jms/src/test/java/org/springframework/jms/support/converter/MessagingMessageConverterTests.java

@ -64,6 +64,11 @@ public class MessagingMessageConverterTests {
verify(session).createObjectMessage(payload); verify(session).createObjectMessage(payload);
} }
@Test
public void fromNull() throws JMSException {
assertNull(converter.fromMessage(null));
}
@Test @Test
public void customPayloadConverter() throws JMSException { public void customPayloadConverter() throws JMSException {
TextMessage jmsMsg = new StubTextMessage("1224"); TextMessage jmsMsg = new StubTextMessage("1224");

63
spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessageReceivingTemplate.java

@ -0,0 +1,63 @@
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
/**
* An extension of {@link AbstractMessageSendingTemplate} that adds support for
* receive style operations as defined by {@link MessageReceivingOperations}.
*
* @author Mark Fisher
* @author Rossen Stoyanchev
* @author Stephane Nicoll
* @since 4.1
*/
public abstract class AbstractMessageReceivingTemplate<D> extends AbstractMessageSendingTemplate<D>
implements MessageReceivingOperations<D> {
@Override
public Message<?> receive() {
return receive(getRequiredDefaultDestination());
}
@Override
public Message<?> receive(D destination) {
return doReceive(destination);
}
protected abstract Message<?> doReceive(D destination);
@Override
public <T> T receiveAndConvert(Class<T> targetClass) {
return receiveAndConvert(getRequiredDefaultDestination(), targetClass);
}
@SuppressWarnings("unchecked")
@Override
public <T> T receiveAndConvert(D destination, Class<T> targetClass) {
Message<?> message = doReceive(destination);
if (message != null) {
return (T) getMessageConverter().fromMessage(message, targetClass);
}
else {
return null;
}
}
}

40
spring-messaging/src/main/java/org/springframework/messaging/core/AbstractMessagingTemplate.java

@ -23,47 +23,15 @@ import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConversionException; import org.springframework.messaging.converter.MessageConversionException;
/** /**
* An extension of {@link AbstractMessageSendingTemplate} that adds support for * An extension of {@link AbstractMessageReceivingTemplate} that adds support for
* receive as well as request-reply style operations as defined by * request-reply style operations as defined by {@link MessageRequestReplyOperations}.
* {@link MessageReceivingOperations} and {@link MessageRequestReplyOperations}.
* *
* @author Mark Fisher * @author Mark Fisher
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*/ */
public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendingTemplate<D> public abstract class AbstractMessagingTemplate<D> extends AbstractMessageReceivingTemplate<D>
implements MessageRequestReplyOperations<D>, MessageReceivingOperations<D> { implements MessageRequestReplyOperations<D> {
@Override
public Message<?> receive() {
return receive(getRequiredDefaultDestination());
}
@Override
public Message<?> receive(D destination) {
return doReceive(destination);
}
protected abstract Message<?> doReceive(D destination);
@Override
public <T> T receiveAndConvert(Class<T> targetClass) {
return receiveAndConvert(getRequiredDefaultDestination(), targetClass);
}
@SuppressWarnings("unchecked")
@Override
public <T> T receiveAndConvert(D destination, Class<T> targetClass) {
Message<?> message = doReceive(destination);
if (message != null) {
return (T) getMessageConverter().fromMessage(message, targetClass);
}
else {
return null;
}
}
@Override @Override
public Message<?> sendAndReceive(Message<?> requestMessage) { public Message<?> sendAndReceive(Message<?> requestMessage) {

Loading…
Cancel
Save