Browse Source

Extend JMS observation scope to error handling

Prior to this commit, the JMS listener observations would only cover the
actual listener invocation, but not the error handling phase. This means
that all errors (including the handled ones) are marked as errors in the
observation and that the observation is not current anymore when error
handling happens.

This commit aligns the behavior with the other Spring Framework
instrumentations and ensures that the error handling phase is fully part
of the observation recording.

Closes gh-31559
pull/31571/head
Brian Clozel 1 year ago
parent
commit
dbb2d4f1d3
  1. 23
      spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java
  2. 5
      spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java
  3. 31
      spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java

23
spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java

@ -675,12 +675,23 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
* @see #handleListenerException * @see #handleListenerException
*/ */
protected void executeListener(Session session, Message message) { protected void executeListener(Session session, Message message) {
createObservation(message).observe(() -> {
try { try {
doExecuteListener(session, message); doExecuteListener(session, message);
} }
catch (Throwable ex) { catch (Throwable ex) {
handleListenerException(ex); handleListenerException(ex);
} }
});
}
protected Observation createObservation(Message message) {
if (micrometerJakartaPresent && this.observationRegistry != null) {
return ObservationFactory.create(this.observationRegistry, message);
}
else {
return Observation.NOOP;
}
} }
/** /**
@ -705,8 +716,7 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
} }
try { try {
Observation observation = createObservation(message); invokeListener(session, message);
observation.observeChecked(() -> invokeListener(session, message));
} }
catch (JMSException | RuntimeException | Error ex) { catch (JMSException | RuntimeException | Error ex) {
rollbackOnExceptionIfNecessary(session, ex); rollbackOnExceptionIfNecessary(session, ex);
@ -715,15 +725,6 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen
commitIfNecessary(session, message); commitIfNecessary(session, message);
} }
private Observation createObservation(Message message) {
if (micrometerJakartaPresent && this.observationRegistry != null) {
return ObservationFactory.create(this.observationRegistry, message);
}
else {
return Observation.NOOP;
}
}
/** /**
* Invoke the specified listener: either as standard JMS MessageListener * Invoke the specified listener: either as standard JMS MessageListener
* or (preferably) as Spring SessionAwareMessageListener. * or (preferably) as Spring SessionAwareMessageListener.

5
spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java

@ -16,6 +16,7 @@
package org.springframework.jms.listener; package org.springframework.jms.listener;
import io.micrometer.observation.Observation;
import jakarta.jms.Connection; import jakarta.jms.Connection;
import jakarta.jms.Destination; import jakarta.jms.Destination;
import jakarta.jms.JMSException; import jakarta.jms.JMSException;
@ -314,6 +315,8 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe
} }
Message message = receiveMessage(consumerToUse); Message message = receiveMessage(consumerToUse);
if (message != null) { if (message != null) {
Observation observation = createObservation(message).start();
Observation.Scope scope = observation.openScope();
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" + consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
@ -347,6 +350,8 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe
if (exposeResource) { if (exposeResource) {
TransactionSynchronizationManager.unbindResource(obtainConnectionFactory()); TransactionSynchronizationManager.unbindResource(obtainConnectionFactory());
} }
observation.stop();
scope.close();
} }
// Indicate that a message has been received. // Indicate that a message has been received.
return true; return true;

31
spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java

@ -18,12 +18,15 @@ package org.springframework.jms.listener;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream; import java.util.stream.Stream;
import io.micrometer.observation.Observation;
import io.micrometer.observation.tck.TestObservationRegistry; import io.micrometer.observation.tck.TestObservationRegistry;
import jakarta.jms.MessageListener; import jakarta.jms.MessageListener;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.junit.EmbeddedActiveMQExtension; import org.apache.activemq.artemis.junit.EmbeddedActiveMQExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.extension.RegisterExtension;
@ -76,6 +79,34 @@ class MessageListenerContainerObservationTests {
listenerContainer.stop(); listenerContainer.stop();
} }
@ParameterizedTest(name = "[{index}] {0}")
@MethodSource("listenerContainers")
void shouldHaveObservationScopeInErrorHandler(AbstractMessageListenerContainer listenerContainer) throws Exception {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
jmsTemplate.convertAndSend("spring.test.observation", "message content");
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Observation> observationInErrorHandler = new AtomicReference<>();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setObservationRegistry(registry);
listenerContainer.setDestinationName("spring.test.observation");
listenerContainer.setMessageListener((MessageListener) message -> {
throw new IllegalStateException("error");
});
listenerContainer.setErrorHandler(error -> {
observationInErrorHandler.set(registry.getCurrentObservation());
latch.countDown();
});
listenerContainer.afterPropertiesSet();
listenerContainer.start();
latch.await(2, TimeUnit.SECONDS);
Assertions.assertThat(observationInErrorHandler.get()).isNotNull();
assertThat(registry).hasObservationWithNameEqualTo("jms.message.process")
.that()
.hasHighCardinalityKeyValue("messaging.destination.name", "spring.test.observation");
listenerContainer.shutdown();
listenerContainer.stop();
}
static Stream<Arguments> listenerContainers() { static Stream<Arguments> listenerContainers() {
return Stream.of( return Stream.of(
arguments(named(DefaultMessageListenerContainer.class.getSimpleName(), new DefaultMessageListenerContainer())), arguments(named(DefaultMessageListenerContainer.class.getSimpleName(), new DefaultMessageListenerContainer())),

Loading…
Cancel
Save