From dbb2d4f1d3926128bf75851e348823a7e4446c69 Mon Sep 17 00:00:00 2001 From: Brian Clozel Date: Tue, 7 Nov 2023 11:18:03 +0100 Subject: [PATCH] Extend JMS observation scope to error handling Prior to this commit, the JMS listener observations would only cover the actual listener invocation, but not the error handling phase. This means that all errors (including the handled ones) are marked as errors in the observation and that the observation is not current anymore when error handling happens. This commit aligns the behavior with the other Spring Framework instrumentations and ensures that the error handling phase is fully part of the observation recording. Closes gh-31559 --- .../AbstractMessageListenerContainer.java | 31 ++++++++++--------- ...stractPollingMessageListenerContainer.java | 5 +++ ...sageListenerContainerObservationTests.java | 31 +++++++++++++++++++ 3 files changed, 52 insertions(+), 15 deletions(-) diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java index e0973d204b..6c0f78b743 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractMessageListenerContainer.java @@ -675,11 +675,22 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen * @see #handleListenerException */ protected void executeListener(Session session, Message message) { - try { - doExecuteListener(session, message); + createObservation(message).observe(() -> { + try { + doExecuteListener(session, message); + } + catch (Throwable ex) { + handleListenerException(ex); + } + }); + } + + protected Observation createObservation(Message message) { + if (micrometerJakartaPresent && this.observationRegistry != null) { + return ObservationFactory.create(this.observationRegistry, message); } - catch (Throwable ex) { - handleListenerException(ex); + else { + return Observation.NOOP; } } @@ -705,8 +716,7 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen } try { - Observation observation = createObservation(message); - observation.observeChecked(() -> invokeListener(session, message)); + invokeListener(session, message); } catch (JMSException | RuntimeException | Error ex) { rollbackOnExceptionIfNecessary(session, ex); @@ -715,15 +725,6 @@ public abstract class AbstractMessageListenerContainer extends AbstractJmsListen commitIfNecessary(session, message); } - private Observation createObservation(Message message) { - if (micrometerJakartaPresent && this.observationRegistry != null) { - return ObservationFactory.create(this.observationRegistry, message); - } - else { - return Observation.NOOP; - } - } - /** * Invoke the specified listener: either as standard JMS MessageListener * or (preferably) as Spring SessionAwareMessageListener. diff --git a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java index 3b43a835bd..c652b213fc 100644 --- a/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java +++ b/spring-jms/src/main/java/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.java @@ -16,6 +16,7 @@ package org.springframework.jms.listener; +import io.micrometer.observation.Observation; import jakarta.jms.Connection; import jakarta.jms.Destination; import jakarta.jms.JMSException; @@ -314,6 +315,8 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe } Message message = receiveMessage(consumerToUse); if (message != null) { + Observation observation = createObservation(message).start(); + Observation.Scope scope = observation.openScope(); if (logger.isDebugEnabled()) { logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" + @@ -347,6 +350,8 @@ public abstract class AbstractPollingMessageListenerContainer extends AbstractMe if (exposeResource) { TransactionSynchronizationManager.unbindResource(obtainConnectionFactory()); } + observation.stop(); + scope.close(); } // Indicate that a message has been received. return true; diff --git a/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java b/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java index 4d65a2f1fc..0335a4400c 100644 --- a/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java +++ b/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerObservationTests.java @@ -18,12 +18,15 @@ package org.springframework.jms.listener; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; +import io.micrometer.observation.Observation; import io.micrometer.observation.tck.TestObservationRegistry; import jakarta.jms.MessageListener; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.junit.EmbeddedActiveMQExtension; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; @@ -76,6 +79,34 @@ class MessageListenerContainerObservationTests { listenerContainer.stop(); } + @ParameterizedTest(name = "[{index}] {0}") + @MethodSource("listenerContainers") + void shouldHaveObservationScopeInErrorHandler(AbstractMessageListenerContainer listenerContainer) throws Exception { + JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); + jmsTemplate.convertAndSend("spring.test.observation", "message content"); + CountDownLatch latch = new CountDownLatch(1); + AtomicReference observationInErrorHandler = new AtomicReference<>(); + listenerContainer.setConnectionFactory(connectionFactory); + listenerContainer.setObservationRegistry(registry); + listenerContainer.setDestinationName("spring.test.observation"); + listenerContainer.setMessageListener((MessageListener) message -> { + throw new IllegalStateException("error"); + }); + listenerContainer.setErrorHandler(error -> { + observationInErrorHandler.set(registry.getCurrentObservation()); + latch.countDown(); + }); + listenerContainer.afterPropertiesSet(); + listenerContainer.start(); + latch.await(2, TimeUnit.SECONDS); + Assertions.assertThat(observationInErrorHandler.get()).isNotNull(); + assertThat(registry).hasObservationWithNameEqualTo("jms.message.process") + .that() + .hasHighCardinalityKeyValue("messaging.destination.name", "spring.test.observation"); + listenerContainer.shutdown(); + listenerContainer.stop(); + } + static Stream listenerContainers() { return Stream.of( arguments(named(DefaultMessageListenerContainer.class.getSimpleName(), new DefaultMessageListenerContainer())),