Browse Source
This commit adds observability support for Jakarta JMS support in spring-jms support. This feature leverages the `JmsInstrumentation` infrastructure in `io.micrometer:micrometer-core` library. This instruments the `JmsTemplate` and the `@JmsListener` support to record observations: * "jms.message.publish" when the `JmsTemplate` sends a message * "jms.message.process" when a message is processed by a `@JmsListener` annotated method The observation `Convention` and `Context` implementations are shipped with "micrometer-core". Closes gh-30335pull/31063/head
12 changed files with 442 additions and 3 deletions
@ -0,0 +1,39 @@
@@ -0,0 +1,39 @@
|
||||
/* |
||||
* Copyright 2002-2023 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.docs.integration.observability.jms.process; |
||||
|
||||
import io.micrometer.observation.ObservationRegistry; |
||||
import jakarta.jms.ConnectionFactory; |
||||
|
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.jms.annotation.EnableJms; |
||||
import org.springframework.jms.config.DefaultJmsListenerContainerFactory; |
||||
|
||||
@Configuration |
||||
@EnableJms |
||||
public class JmsConfiguration { |
||||
|
||||
@Bean |
||||
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory, ObservationRegistry observationRegistry) { |
||||
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); |
||||
factory.setConnectionFactory(connectionFactory); |
||||
factory.setObservationRegistry(observationRegistry); |
||||
return factory; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,44 @@
@@ -0,0 +1,44 @@
|
||||
/* |
||||
* Copyright 2002-2023 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.docs.integration.observability.jms.publish; |
||||
|
||||
import io.micrometer.observation.ObservationRegistry; |
||||
import jakarta.jms.ConnectionFactory; |
||||
|
||||
import org.springframework.jms.core.JmsMessagingTemplate; |
||||
import org.springframework.jms.core.JmsTemplate; |
||||
|
||||
public class JmsTemplatePublish { |
||||
|
||||
private final JmsTemplate jmsTemplate; |
||||
|
||||
private final JmsMessagingTemplate jmsMessagingTemplate; |
||||
|
||||
public JmsTemplatePublish(ObservationRegistry observationRegistry, ConnectionFactory connectionFactory) { |
||||
this.jmsTemplate = new JmsTemplate(connectionFactory); |
||||
// configure the observation registry
|
||||
this.jmsTemplate.setObservationRegistry(observationRegistry); |
||||
|
||||
// For JmsMessagingTemplate, instantiate it with a JMS template that has a configured registry
|
||||
this.jmsMessagingTemplate = new JmsMessagingTemplate(this.jmsTemplate); |
||||
} |
||||
|
||||
public void sendMessages() { |
||||
this.jmsTemplate.convertAndSend("spring.observation.test", "test message"); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,90 @@
@@ -0,0 +1,90 @@
|
||||
/* |
||||
* Copyright 2002-2023 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.jms.core; |
||||
|
||||
import java.util.concurrent.CountDownLatch; |
||||
import java.util.concurrent.TimeUnit; |
||||
|
||||
import io.micrometer.observation.tck.TestObservationRegistry; |
||||
import jakarta.jms.MessageConsumer; |
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; |
||||
import org.apache.activemq.artemis.junit.EmbeddedActiveMQExtension; |
||||
import org.junit.jupiter.api.AfterEach; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.Test; |
||||
import org.junit.jupiter.api.extension.RegisterExtension; |
||||
|
||||
import static io.micrometer.observation.tck.TestObservationRegistryAssert.assertThat; |
||||
|
||||
/** |
||||
* Tests for Observability related {@link JmsTemplate}. |
||||
* |
||||
* @author Brian Clozel |
||||
*/ |
||||
class JmsTemplateObservationTests { |
||||
|
||||
@RegisterExtension |
||||
EmbeddedActiveMQExtension server = new EmbeddedActiveMQExtension(); |
||||
|
||||
TestObservationRegistry registry = TestObservationRegistry.create(); |
||||
|
||||
private ActiveMQConnectionFactory connectionFactory; |
||||
|
||||
@BeforeEach |
||||
void setupServer() { |
||||
server.start(); |
||||
connectionFactory = new ActiveMQConnectionFactory(server.getVmURL()); |
||||
} |
||||
|
||||
@Test |
||||
void shouldRecordJmsPublishObservations() { |
||||
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); |
||||
jmsTemplate.setObservationRegistry(registry); |
||||
jmsTemplate.convertAndSend("spring.test.observation", "message content"); |
||||
assertThat(registry).hasObservationWithNameEqualTo("jms.message.publish") |
||||
.that() |
||||
.hasHighCardinalityKeyValue("messaging.destination.name", "spring.test.observation"); |
||||
} |
||||
|
||||
@Test |
||||
void shouldRecordJmsProcessObservations() throws Exception { |
||||
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); |
||||
jmsTemplate.setObservationRegistry(registry); |
||||
jmsTemplate.convertAndSend("spring.test.observation", "message content"); |
||||
jmsTemplate.execute(session -> { |
||||
try { |
||||
CountDownLatch latch = new CountDownLatch(1); |
||||
MessageConsumer mc = session.createConsumer(session.createQueue("spring.test.observation")); |
||||
mc.setMessageListener(message -> latch.countDown()); |
||||
return latch.await(2, TimeUnit.SECONDS); |
||||
} |
||||
catch (InterruptedException e) { |
||||
throw new RuntimeException(e); |
||||
} |
||||
}, true); |
||||
assertThat(registry).hasObservationWithNameEqualTo("jms.message.process") |
||||
.that() |
||||
.hasHighCardinalityKeyValue("messaging.destination.name", "spring.test.observation"); |
||||
} |
||||
|
||||
@AfterEach |
||||
void shutdownServer() { |
||||
connectionFactory.close(); |
||||
server.stop(); |
||||
} |
||||
|
||||
} |
@ -0,0 +1,95 @@
@@ -0,0 +1,95 @@
|
||||
/* |
||||
* Copyright 2002-2023 the original author or authors. |
||||
* |
||||
* Licensed under the Apache License, Version 2.0 (the "License"); |
||||
* you may not use this file except in compliance with the License. |
||||
* You may obtain a copy of the License at |
||||
* |
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
* |
||||
* Unless required by applicable law or agreed to in writing, software |
||||
* distributed under the License is distributed on an "AS IS" BASIS, |
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
* See the License for the specific language governing permissions and |
||||
* limitations under the License. |
||||
*/ |
||||
|
||||
package org.springframework.jms.listener; |
||||
|
||||
import java.util.concurrent.CountDownLatch; |
||||
import java.util.concurrent.TimeUnit; |
||||
import java.util.stream.Stream; |
||||
|
||||
import io.micrometer.observation.tck.TestObservationRegistry; |
||||
import jakarta.jms.Message; |
||||
import jakarta.jms.MessageListener; |
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; |
||||
import org.apache.activemq.artemis.junit.EmbeddedActiveMQExtension; |
||||
import org.junit.jupiter.api.AfterEach; |
||||
import org.junit.jupiter.api.BeforeEach; |
||||
import org.junit.jupiter.api.extension.RegisterExtension; |
||||
import org.junit.jupiter.params.ParameterizedTest; |
||||
import org.junit.jupiter.params.provider.Arguments; |
||||
import org.junit.jupiter.params.provider.MethodSource; |
||||
|
||||
import org.springframework.jms.core.JmsTemplate; |
||||
|
||||
import static io.micrometer.observation.tck.TestObservationRegistryAssert.assertThat; |
||||
|
||||
/** |
||||
* Observation tests for {@link AbstractMessageListenerContainer} implementations. |
||||
* @author Brian Clozel |
||||
*/ |
||||
class MessageListenerContainerObservationTests { |
||||
|
||||
@RegisterExtension |
||||
EmbeddedActiveMQExtension server = new EmbeddedActiveMQExtension(); |
||||
|
||||
TestObservationRegistry registry = TestObservationRegistry.create(); |
||||
|
||||
private ActiveMQConnectionFactory connectionFactory; |
||||
|
||||
@BeforeEach |
||||
void setupServer() { |
||||
server.start(); |
||||
connectionFactory = new ActiveMQConnectionFactory(server.getVmURL()); |
||||
} |
||||
|
||||
@ParameterizedTest(name = "{index} {0}") |
||||
@MethodSource("listenerContainers") |
||||
void shouldRecordJmsProcessObservations(String implementationClass, AbstractMessageListenerContainer listenerContainer) throws Exception { |
||||
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); |
||||
jmsTemplate.convertAndSend("spring.test.observation", "message content"); |
||||
CountDownLatch latch = new CountDownLatch(1); |
||||
listenerContainer.setConnectionFactory(connectionFactory); |
||||
listenerContainer.setObservationRegistry(registry); |
||||
listenerContainer.setDestinationName("spring.test.observation"); |
||||
listenerContainer.setMessageListener(new MessageListener() { |
||||
@Override |
||||
public void onMessage(Message message) { |
||||
latch.countDown(); |
||||
} |
||||
}); |
||||
listenerContainer.afterPropertiesSet(); |
||||
listenerContainer.start(); |
||||
latch.await(2, TimeUnit.SECONDS); |
||||
assertThat(registry).hasObservationWithNameEqualTo("jms.message.process") |
||||
.that() |
||||
.hasHighCardinalityKeyValue("messaging.destination.name", "spring.test.observation"); |
||||
listenerContainer.shutdown(); |
||||
listenerContainer.stop(); |
||||
} |
||||
|
||||
static Stream<Arguments> listenerContainers() { |
||||
return Stream.of( |
||||
Arguments.of(DefaultMessageListenerContainer.class.getSimpleName(), new DefaultMessageListenerContainer()), |
||||
Arguments.of(SimpleMessageListenerContainer.class.getSimpleName(), new SimpleMessageListenerContainer()) |
||||
); |
||||
} |
||||
|
||||
@AfterEach |
||||
void shutdownServer() { |
||||
connectionFactory.close(); |
||||
server.stop(); |
||||
} |
||||
} |
Loading…
Reference in new issue