diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java index e0973d204b..6c0f78b743 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java @@ -675,11 +675,22 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen * @see #handleListenerException */ protected void executeListener(Session session, Message message) { - try { - doExecuteListener(session, message); + createObservation(message).observe(() -> { + try { + doExecuteListener(session, message); + } + catch (Throwable ex) { + handleListenerException(ex); + } + }); + } + + protected Observation createObservation(Message message) { + if (micrometerJakartaPresent && this.observationRegistry != null) { + return ObservationFactory.create(this.observationRegistry, message); } - catch (Throwable ex) { - handleListenerException(ex); + else { + return Observation.NOOP; } } @@ -705,8 +716,7 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen } try { - Observation observation = createObservation(message); - observation.observeChecked(() -> invokeListener(session, message)); + invokeListener(session, message); } catch (JMSException | RuntimeException | Error ex) { rollbackOnExceptionIfNecessary(session, ex); @@ -715,15 +725,6 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen commitIfNecessary(session, message); } - private Observation createObservation(Message message) { - if (micrometerJakartaPresent && this.observationRegistry != null) { - return ObservationFactory.create(this.observationRegistry, message); - } - else { - return Observation.NOOP; - } - } - /** * Invoke the specified listener: either as standard JMS MessageListener * or (preferably) as Spring SessionAwareMessageListener. diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java index 3b43a835bd..c652b213fc 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java @@ -16,6 +16,7 @@ package org.springframework.jms.listener; +import io.micrometer.observation.Observation; import jakarta.jms.Connection; import jakarta.jms.Destination; import jakarta.jms.JMSException; @@ -314,6 +315,8 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe } Message message = receiveMessage(consumerToUse); if (message != null) { + Observation observation = createObservation(message).start(); + Observation.Scope scope = observation.openScope(); if (logger.isDebugEnabled()) { logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" + @@ -347,6 +350,8 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe if (exposeResource) { TransactionSynchronizationManager.unbindResource(obtainConnectionFactory()); } + observation.stop(); + scope.close(); } // Indicate that a message has been received. return true; diff --git a/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java b/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java index 4d65a2f1fc..0335a4400c 100644 --- a/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java @@ -18,12 +18,15 @@ package org.springframework.jms.listener; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; +import io.micrometer.observation.Observation; import io.micrometer.observation.tck.TestObservationRegistry; import jakarta.jms.MessageListener; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.junit.EmbeddedActiveMQExtension; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; @@ -76,6 +79,34 @@ class MessageListenerContainerObservationTests { listenerContainer.stop(); } + @ParameterizedTest(name = "[{index}] {0}") + @MethodSource("listenerContainers") + void shouldHaveObservationScopeInErrorHandler(AbstractMessageListenerContainer listenerContainer) throws Exception { + JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); + jmsTemplate.convertAndSend("spring.test.observation", "message content"); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference observationInErrorHandler = new AtomicReference<>(); + listenerContainer.setConnectionFactory(connectionFactory); + listenerContainer.setObservationRegistry(registry); + listenerContainer.setDestinationName("spring.test.observation"); + listenerContainer.setMessageListener((MessageListener) message -> { + throw new IllegalStateException("error"); + }); + listenerContainer.setErrorHandler(error -> { + observationInErrorHandler.set(registry.getCurrentObservation()); + latch.countDown(); + }); + listenerContainer.afterPropertiesSet(); + listenerContainer.start(); + latch.await(2, TimeUnit.SECONDS); + Assertions.assertThat(observationInErrorHandler.get()).isNotNull(); + assertThat(registry).hasObservationWithNameEqualTo("jms.message.process") + .that() + .hasHighCardinalityKeyValue("messaging.destination.name", "spring.test.observation"); + listenerContainer.shutdown(); + listenerContainer.stop(); + } + static Stream listenerContainers() { return Stream.of( arguments(named(DefaultMessageListenerContainer.class.getSimpleName(), new DefaultMessageListenerContainer())),