diff --git a/build.gradle b/build.gradle index bc583e571bd..24a1bb586d6 100644 --- a/build.gradle +++ b/build.gradle @@ -766,6 +766,7 @@ project('spring-integration-jms') { testImplementation "org.apache.activemq:artemis-jakarta-client:$artemisVersion" testImplementation 'org.springframework:spring-oxm' testImplementation 'com.fasterxml.jackson.core:jackson-databind' + testImplementation 'io.micrometer:micrometer-observation-test' } } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageProducerSpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageProducerSpec.java index 49217f87a69..c9b42ea6df0 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageProducerSpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessageProducerSpec.java @@ -17,7 +17,10 @@ package org.springframework.integration.dsl; import org.springframework.integration.endpoint.MessageProducerSupport; +import org.springframework.integration.gateway.MessagingGatewaySupport; import org.springframework.integration.support.ErrorMessageStrategy; +import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention; +import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention; import org.springframework.lang.Nullable; import org.springframework.messaging.MessageChannel; @@ -152,4 +155,16 @@ public S errorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) { return _this(); } + /** + * Provide a custom {@link MessageReceiverObservationConvention}. + * @param observationConvention the observation convention to use. + * @return the spec. + * @since 6.0.8 + * @see MessageProducerSupport#setObservationConvention(MessageReceiverObservationConvention) + */ + public S observationConvention(MessageReceiverObservationConvention observationConvention) { + this.target.setObservationConvention(observationConvention); + return _this(); + } + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessagingGatewaySpec.java b/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessagingGatewaySpec.java index ed696e27f14..1a6ad59f4bf 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessagingGatewaySpec.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/dsl/MessagingGatewaySpec.java @@ -19,6 +19,7 @@ import org.springframework.integration.gateway.MessagingGatewaySupport; import org.springframework.integration.mapping.InboundMessageMapper; import org.springframework.integration.mapping.OutboundMessageMapper; +import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention; import org.springframework.lang.Nullable; import org.springframework.messaging.MessageChannel; @@ -205,4 +206,16 @@ public S shouldTrack(boolean shouldTrack) { return _this(); } + /** + * Provide a custom {@link MessageRequestReplyReceiverObservationConvention}. + * @param observationConvention the observation convention to use. + * @return the spec. + * @since 6.0.8 + * @see MessagingGatewaySupport#setObservationConvention(MessageRequestReplyReceiverObservationConvention) + */ + public S observationConvention(MessageRequestReplyReceiverObservationConvention observationConvention) { + this.target.setObservationConvention(observationConvention); + return _this(); + } + } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java b/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java index dfd358fc23f..8bf30d2dd13 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java @@ -56,12 +56,16 @@ import org.springframework.integration.support.converter.SimpleMessageConverter; import org.springframework.integration.support.management.IntegrationInboundManagement; import org.springframework.integration.support.management.IntegrationManagedResource; +import org.springframework.integration.support.management.TrackableComponent; import org.springframework.integration.support.management.metrics.MeterFacade; import org.springframework.integration.support.management.metrics.MetricsCaptor; import org.springframework.integration.support.management.metrics.SampleFacade; import org.springframework.integration.support.management.metrics.TimerFacade; +import org.springframework.integration.support.management.observation.DefaultMessageReceiverObservationConvention; import org.springframework.integration.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention; import org.springframework.integration.support.management.observation.IntegrationObservation; +import org.springframework.integration.support.management.observation.MessageReceiverContext; +import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention; import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverContext; import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention; import org.springframework.lang.Nullable; @@ -91,7 +95,7 @@ */ @IntegrationManagedResource public abstract class MessagingGatewaySupport extends AbstractEndpoint - implements org.springframework.integration.support.management.TrackableComponent, + implements TrackableComponent, IntegrationInboundManagement, IntegrationPattern { protected final ConvertingMessagingTemplate messagingTemplate; // NOSONAR @@ -144,6 +148,8 @@ public abstract class MessagingGatewaySupport extends AbstractEndpoint @Nullable private MessageRequestReplyReceiverObservationConvention observationConvention; + private MessageReceiverObservationConvention receiverObservationConvention; + private volatile AbstractEndpoint replyMessageCorrelator; private volatile boolean initialized; @@ -384,6 +390,10 @@ public void setObservationConvention( this.observationConvention = observationConvention; } + public void setReceiverObservationConvention(MessageReceiverObservationConvention receiverObservationConvention) { + this.receiverObservationConvention = receiverObservationConvention; + } + @Override protected void onInit() { Assert.state(!(this.requestChannelName != null && this.requestChannel != null), @@ -468,27 +478,62 @@ protected void send(Object object) { MessageChannel channel = getRequestChannel(); Assert.state(channel != null, "send is not supported, because no request channel has been configured"); - SampleFacade sample = null; - if (this.metricsCaptor != null) { - sample = this.metricsCaptor.start(); + + Message requestMessage = this.messagingTemplate.doConvert(object, null, this.historyWritingPostProcessor); + + if (!ObservationRegistry.NOOP.equals(this.observationRegistry)) { + sendWithObservation(channel, requestMessage); } + else if (this.metricsCaptor != null) { + sendWithMetrics(channel, requestMessage); + } + else { + doSend(channel, requestMessage); + } + } + + private void sendWithObservation(MessageChannel channel, Message message) { try { - this.messagingTemplate.convertAndSend(channel, object, this.historyWritingPostProcessor); - if (sample != null) { - sample.stop(sendTimer()); - } + IntegrationObservation.HANDLER.observation( + this.receiverObservationConvention, + DefaultMessageReceiverObservationConvention.INSTANCE, + () -> new MessageReceiverContext(message, getComponentName()), + this.observationRegistry) + .observe(() -> this.messagingTemplate.send(channel, message)); } - catch (Exception e) { - if (sample != null) { - sample.stop(buildSendTimer(false, e.getClass().getSimpleName())); - } - MessageChannel errorChan = getErrorChannel(); - if (errorChan != null) { - this.messagingTemplate.send(errorChan, new ErrorMessage(e)); - } - else { - rethrow(e, "failed to send message"); - } + catch (Exception ex) { + sendErrorMessage(ex, message); + } + } + + private void sendWithMetrics(MessageChannel channel, Message message) { + SampleFacade sample = this.metricsCaptor.start(); + try { + this.messagingTemplate.send(channel, message); + sample.stop(sendTimer()); + } + catch (Exception ex) { + sample.stop(buildSendTimer(false, ex.getClass().getSimpleName())); + sendErrorMessage(ex, message); + } + } + + private void doSend(MessageChannel channel, Message message) { + try { + this.messagingTemplate.send(channel, message); + } + catch (Exception ex) { + sendErrorMessage(ex, message); + } + } + + private void sendErrorMessage(Exception exception, Message failedMessage) { + MessageChannel errorChan = getErrorChannel(); + if (errorChan != null) { + this.messagingTemplate.send(errorChan, buildErrorMessage(failedMessage, exception)); + } + else { + rethrow(exception, "failed to send message"); } } diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java index c7dff8182b3..b6bba27257a 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/ChannelPublishingJmsMessageListener.java @@ -18,6 +18,7 @@ import java.util.Map; +import io.micrometer.observation.ObservationRegistry; import jakarta.jms.DeliveryMode; import jakarta.jms.Destination; import jakarta.jms.InvalidDestinationException; @@ -38,6 +39,9 @@ import org.springframework.integration.support.DefaultMessageBuilderFactory; import org.springframework.integration.support.MessageBuilderFactory; import org.springframework.integration.support.management.TrackableComponent; +import org.springframework.integration.support.management.metrics.MetricsCaptor; +import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention; +import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention; import org.springframework.integration.support.utils.IntegrationUtils; import org.springframework.jms.listener.SessionAwareMessageListener; import org.springframework.jms.support.JmsUtils; @@ -323,6 +327,26 @@ public void setExtractReplyPayload(boolean extractReplyPayload) { this.extractReplyPayload = extractReplyPayload; } + public void setMetricsCaptor(MetricsCaptor captor) { + this.gatewayDelegate.registerMetricsCaptor(captor); + } + + public void setObservationRegistry(ObservationRegistry observationRegistry) { + this.gatewayDelegate.registerObservationRegistry(observationRegistry); + } + + public void setRequestReplyObservationConvention( + @Nullable MessageRequestReplyReceiverObservationConvention observationConvention) { + + this.gatewayDelegate.setObservationConvention(observationConvention); + } + + public void setReceiverObservationConvention( + @Nullable MessageReceiverObservationConvention observationConvention) { + + this.gatewayDelegate.setReceiverObservationConvention(observationConvention); + } + @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java index 4ade7d7976b..a5d7e9fb7d7 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsInboundGateway.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,10 +16,14 @@ package org.springframework.integration.jms; +import io.micrometer.observation.ObservationRegistry; + import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.integration.context.OrderlyShutdownCapable; import org.springframework.integration.gateway.MessagingGatewaySupport; +import org.springframework.integration.support.management.metrics.MetricsCaptor; +import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention; import org.springframework.jms.listener.AbstractMessageListenerContainer; import org.springframework.messaging.MessageChannel; @@ -114,28 +118,38 @@ public void setShutdownContainerOnStop(boolean shutdownContainerOnStop) { this.endpoint.setShutdownContainerOnStop(shutdownContainerOnStop); } + @Override + public void registerMetricsCaptor(MetricsCaptor metricsCaptorToRegister) { + super.registerMetricsCaptor(metricsCaptorToRegister); + this.endpoint.registerMetricsCaptor(metricsCaptorToRegister); + } @Override - public String getComponentType() { - return this.endpoint.getComponentType(); + public void registerObservationRegistry(ObservationRegistry observationRegistry) { + super.registerObservationRegistry(observationRegistry); + this.endpoint.registerObservationRegistry(observationRegistry); } @Override - public void setComponentName(String componentName) { - super.setComponentName(componentName); - this.endpoint.setComponentName(getComponentName()); + public void setObservationConvention(MessageRequestReplyReceiverObservationConvention observationConvention) { + super.setObservationConvention(observationConvention); + this.endpoint.getListener().setRequestReplyObservationConvention(observationConvention); + } + + @Override + public String getComponentType() { + return this.endpoint.getComponentType(); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { super.setApplicationContext(applicationContext); this.endpoint.setApplicationContext(applicationContext); - this.endpoint.setBeanFactory(applicationContext); - this.endpoint.getListener().setBeanFactory(applicationContext); } @Override protected void onInit() { + this.endpoint.setComponentName(getComponentName()); this.endpoint.afterPropertiesSet(); } diff --git a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsMessageDrivenEndpoint.java b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsMessageDrivenEndpoint.java index f4e8d617cc0..845a5ffae36 100644 --- a/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsMessageDrivenEndpoint.java +++ b/spring-integration-jms/src/main/java/org/springframework/integration/jms/JmsMessageDrivenEndpoint.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,11 +16,15 @@ package org.springframework.integration.jms; +import io.micrometer.observation.ObservationRegistry; + import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.integration.context.OrderlyShutdownCapable; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.jms.util.JmsAdapterUtils; +import org.springframework.integration.support.management.metrics.MetricsCaptor; +import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention; import org.springframework.jms.listener.AbstractMessageListenerContainer; import org.springframework.jms.listener.DefaultMessageListenerContainer; import org.springframework.messaging.MessageChannel; @@ -91,7 +95,7 @@ private JmsMessageDrivenEndpoint(AbstractMessageListenerContainer listenerContai * container setting even if an external container is provided. Defaults to null * (won't change container) if an external container is provided or `transacted` when * the framework creates an implicit {@link DefaultMessageListenerContainer}. - * @param sessionAcknowledgeMode the acknowledge mode. + * @param sessionAcknowledgeMode the acknowledgement mode. */ public void setSessionAcknowledgeMode(String sessionAcknowledgeMode) { this.sessionAcknowledgeMode = sessionAcknowledgeMode; @@ -134,9 +138,9 @@ public void setShouldTrack(boolean shouldTrack) { } /** - * Set to false to prevent listener container shutdown when the endpoint is stopped. + * Set to {@code false} to prevent listener container shutdown when the endpoint is stopped. * Then, if so configured, any cached consumer(s) in the container will remain. - * Otherwise the shared connection and will be closed and the listener invokers shut + * Otherwise, the shared connection and will be closed and the listener invokers shut * down; this behavior is new starting with version 5.1. Default: true. * @param shutdownContainerOnStop false to not shutdown. * @since 5.1 @@ -149,6 +153,24 @@ public ChannelPublishingJmsMessageListener getListener() { return this.listener; } + @Override + public void registerMetricsCaptor(MetricsCaptor captor) { + super.registerMetricsCaptor(captor); + this.listener.setMetricsCaptor(captor); + } + + @Override + public void registerObservationRegistry(ObservationRegistry observationRegistry) { + super.registerObservationRegistry(observationRegistry); + this.listener.setObservationRegistry(observationRegistry); + } + + @Override + public void setObservationConvention(MessageReceiverObservationConvention observationConvention) { + super.setObservationConvention(observationConvention); + this.listener.setReceiverObservationConvention(observationConvention); + } + @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { super.setApplicationContext(applicationContext); diff --git a/spring-integration-jms/src/test/java/org/springframework/integration/jms/dsl/JmsTests.java b/spring-integration-jms/src/test/java/org/springframework/integration/jms/dsl/JmsTests.java index fc4c87035fe..4924cdb2a98 100644 --- a/spring-integration-jms/src/test/java/org/springframework/integration/jms/dsl/JmsTests.java +++ b/spring-integration-jms/src/test/java/org/springframework/integration/jms/dsl/JmsTests.java @@ -22,8 +22,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import io.micrometer.observation.tck.TestObservationRegistry; +import io.micrometer.observation.tck.TestObservationRegistryAssert; import jakarta.jms.JMSException; import jakarta.jms.TextMessage; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.ListableBeanFactory; @@ -40,6 +43,7 @@ import org.springframework.integration.channel.FixedSubscriberChannel; import org.springframework.integration.channel.QueueChannel; import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.config.EnableIntegrationManagement; import org.springframework.integration.config.GlobalChannelInterceptor; import org.springframework.integration.core.MessageSource; import org.springframework.integration.dsl.IntegrationFlow; @@ -55,6 +59,7 @@ import org.springframework.integration.jms.SubscribableJmsChannel; import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.support.MessageBuilder; +import org.springframework.integration.support.management.observation.IntegrationObservation; import org.springframework.integration.test.util.TestUtils; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.listener.DefaultMessageListenerContainer; @@ -144,6 +149,14 @@ public class JmsTests extends ActiveMQMultiContextTests { @Autowired JmsTemplate jmsTemplate; + @Autowired + TestObservationRegistry observationRegistry; + + @BeforeEach + void setup() { + this.observationRegistry.clear(); + } + @Test public void testPollingFlow() { this.controlBus.send("@'integerMessageSource.inboundChannelAdapter'.start()"); @@ -204,6 +217,14 @@ public void testJmsOutboundInboundFlow() { .isEqualTo("foo"); assertThat(this.jmsOutboundFlowTemplate).isNotNull(); + + TestObservationRegistryAssert.assertThat(this.observationRegistry) + .hasObservationWithNameEqualTo("spring.integration.handler") + .that() + .hasLowCardinalityKeyValue(IntegrationObservation.HandlerTags.COMPONENT_NAME.asString(), + "observedJmsMessageDrivenChannelAdapter") + .hasBeenStarted() + .hasBeenStopped(); } @Test @@ -239,6 +260,14 @@ public void testJmsPipelineFlow() { .isNotNull() .extracting(Message::getPayload) .isEqualTo("error: junk is not convertible"); + + TestObservationRegistryAssert.assertThat(this.observationRegistry) + .hasObservationWithNameEqualTo("spring.integration.gateway") + .that() + .hasLowCardinalityKeyValue(IntegrationObservation.GatewayTags.COMPONENT_NAME.asString(), + "observedJmsInboundGateway") + .hasBeenStarted() + .hasBeenStopped(); } @Test @@ -294,8 +323,14 @@ private interface ControlBusGateway { @Configuration @EnableIntegration @IntegrationComponentScan + @EnableIntegrationManagement(observationPatterns = "observedJms*") public static class ContextConfiguration { + @Bean + TestObservationRegistry observationRegistry() { + return TestObservationRegistry.create(); + } + @Bean public JmsTemplate jmsTemplate() { JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory); @@ -407,9 +442,10 @@ public Message preSend(Message message, MessageChannel channel) { public IntegrationFlow jmsMessageDrivenFlowWithContainer() { return IntegrationFlow .from(Jms.messageDrivenChannelAdapter( - Jms.container(amqFactory, "containerSpecDestination") - .pubSubDomain(false) - .taskExecutor(Executors.newCachedThreadPool()))) + Jms.container(amqFactory, "containerSpecDestination") + .pubSubDomain(false) + .taskExecutor(Executors.newCachedThreadPool())) + .id("observedJmsMessageDrivenChannelAdapter")) .transform(String::trim) .channel(jmsOutboundInboundReplyChannel()) .get(); @@ -428,6 +464,7 @@ public IntegrationFlow jmsOutboundGatewayFlow() { public IntegrationFlow jmsInboundGatewayFlow() { return IntegrationFlow.from( Jms.inboundGateway(amqFactory) + .id("observedJmsInboundGateway") .requestChannel(jmsInboundGatewayInputChannel()) .replyTimeout(1) .errorOnTimeout(true)