Skip to content

Commit

Permalink
Fix JMS Inbound Endpoints for observation
Browse files Browse the repository at this point in the history
The `JmsMessageDrivenEndpoint` delegates all the hard work to the
`ChannelPublishingJmsMessageListener`, but missed to propagate an `ObservationRegistry`
and other related options.
The `JmsInboundGateway` is worse: it delegated to the `JmsMessageDrivenEndpoint`

* Add `IntegrationObservation.HANDLER` observation to the `MessagingGatewaySupport.send()`
operation: used by the delegate in the `ChannelPublishingJmsMessageListener`
* Expose and propagate observation-related options from `JmsInboundGateway`
and `JmsMessageDrivenEndpoint`
* Expose `observationConvention()` option on the `MessagingGatewaySpec`
and `MessageProducerSpec`

**Cherry-pick to `6.1.x` & `6.0.x`**
  • Loading branch information
artembilan committed Sep 18, 2023
1 parent 10e79ba commit 96bfd27
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 34 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ project('spring-integration-jms') {
testImplementation "org.apache.activemq:artemis-jakarta-client:$artemisVersion"
testImplementation 'org.springframework:spring-oxm'
testImplementation 'com.fasterxml.jackson.core:jackson-databind'
testImplementation 'io.micrometer:micrometer-observation-test'
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package org.springframework.integration.dsl;

import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.support.ErrorMessageStrategy;
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;

Expand Down Expand Up @@ -152,4 +155,16 @@ public S errorMessageStrategy(ErrorMessageStrategy errorMessageStrategy) {
return _this();
}

/**
* Provide a custom {@link MessageReceiverObservationConvention}.
* @param observationConvention the observation convention to use.
* @return the spec.
* @since 6.0.8
* @see MessageProducerSupport#setObservationConvention(MessageReceiverObservationConvention)
*/
public S observationConvention(MessageReceiverObservationConvention observationConvention) {
this.target.setObservationConvention(observationConvention);
return _this();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.mapping.InboundMessageMapper;
import org.springframework.integration.mapping.OutboundMessageMapper;
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
import org.springframework.lang.Nullable;
import org.springframework.messaging.MessageChannel;

Expand Down Expand Up @@ -205,4 +206,16 @@ public S shouldTrack(boolean shouldTrack) {
return _this();
}

/**
* Provide a custom {@link MessageRequestReplyReceiverObservationConvention}.
* @param observationConvention the observation convention to use.
* @return the spec.
* @since 6.0.8
* @see MessagingGatewaySupport#setObservationConvention(MessageRequestReplyReceiverObservationConvention)
*/
public S observationConvention(MessageRequestReplyReceiverObservationConvention observationConvention) {
this.target.setObservationConvention(observationConvention);
return _this();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,16 @@
import org.springframework.integration.support.converter.SimpleMessageConverter;
import org.springframework.integration.support.management.IntegrationInboundManagement;
import org.springframework.integration.support.management.IntegrationManagedResource;
import org.springframework.integration.support.management.TrackableComponent;
import org.springframework.integration.support.management.metrics.MeterFacade;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.integration.support.management.metrics.SampleFacade;
import org.springframework.integration.support.management.metrics.TimerFacade;
import org.springframework.integration.support.management.observation.DefaultMessageReceiverObservationConvention;
import org.springframework.integration.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention;
import org.springframework.integration.support.management.observation.IntegrationObservation;
import org.springframework.integration.support.management.observation.MessageReceiverContext;
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverContext;
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -91,7 +95,7 @@
*/
@IntegrationManagedResource
public abstract class MessagingGatewaySupport extends AbstractEndpoint
implements org.springframework.integration.support.management.TrackableComponent,
implements TrackableComponent,
IntegrationInboundManagement, IntegrationPattern {

protected final ConvertingMessagingTemplate messagingTemplate; // NOSONAR
Expand Down Expand Up @@ -144,6 +148,8 @@ public abstract class MessagingGatewaySupport extends AbstractEndpoint
@Nullable
private MessageRequestReplyReceiverObservationConvention observationConvention;

private MessageReceiverObservationConvention receiverObservationConvention;

private volatile AbstractEndpoint replyMessageCorrelator;

private volatile boolean initialized;
Expand Down Expand Up @@ -384,6 +390,10 @@ public void setObservationConvention(
this.observationConvention = observationConvention;
}

public void setReceiverObservationConvention(MessageReceiverObservationConvention receiverObservationConvention) {
this.receiverObservationConvention = receiverObservationConvention;
}

@Override
protected void onInit() {
Assert.state(!(this.requestChannelName != null && this.requestChannel != null),
Expand Down Expand Up @@ -468,27 +478,62 @@ protected void send(Object object) {
MessageChannel channel = getRequestChannel();
Assert.state(channel != null,
"send is not supported, because no request channel has been configured");
SampleFacade sample = null;
if (this.metricsCaptor != null) {
sample = this.metricsCaptor.start();

Message<?> requestMessage = this.messagingTemplate.doConvert(object, null, this.historyWritingPostProcessor);

if (!ObservationRegistry.NOOP.equals(this.observationRegistry)) {
sendWithObservation(channel, requestMessage);
}
else if (this.metricsCaptor != null) {
sendWithMetrics(channel, requestMessage);
}
else {
doSend(channel, requestMessage);
}
}

private void sendWithObservation(MessageChannel channel, Message<?> message) {
try {
this.messagingTemplate.convertAndSend(channel, object, this.historyWritingPostProcessor);
if (sample != null) {
sample.stop(sendTimer());
}
IntegrationObservation.HANDLER.observation(
this.receiverObservationConvention,
DefaultMessageReceiverObservationConvention.INSTANCE,
() -> new MessageReceiverContext(message, getComponentName()),
this.observationRegistry)
.observe(() -> this.messagingTemplate.send(channel, message));
}
catch (Exception e) {
if (sample != null) {
sample.stop(buildSendTimer(false, e.getClass().getSimpleName()));
}
MessageChannel errorChan = getErrorChannel();
if (errorChan != null) {
this.messagingTemplate.send(errorChan, new ErrorMessage(e));
}
else {
rethrow(e, "failed to send message");
}
catch (Exception ex) {
sendErrorMessage(ex, message);
}
}

private void sendWithMetrics(MessageChannel channel, Message<?> message) {
SampleFacade sample = this.metricsCaptor.start();
try {
this.messagingTemplate.send(channel, message);
sample.stop(sendTimer());
}
catch (Exception ex) {
sample.stop(buildSendTimer(false, ex.getClass().getSimpleName()));
sendErrorMessage(ex, message);
}
}

private void doSend(MessageChannel channel, Message<?> message) {
try {
this.messagingTemplate.send(channel, message);
}
catch (Exception ex) {
sendErrorMessage(ex, message);
}
}

private void sendErrorMessage(Exception exception, Message<?> failedMessage) {
MessageChannel errorChan = getErrorChannel();
if (errorChan != null) {
this.messagingTemplate.send(errorChan, buildErrorMessage(failedMessage, exception));
}
else {
rethrow(exception, "failed to send message");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.Map;

import io.micrometer.observation.ObservationRegistry;
import jakarta.jms.DeliveryMode;
import jakarta.jms.Destination;
import jakarta.jms.InvalidDestinationException;
Expand All @@ -38,6 +39,9 @@
import org.springframework.integration.support.DefaultMessageBuilderFactory;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.integration.support.management.TrackableComponent;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.jms.listener.SessionAwareMessageListener;
import org.springframework.jms.support.JmsUtils;
Expand Down Expand Up @@ -323,6 +327,26 @@ public void setExtractReplyPayload(boolean extractReplyPayload) {
this.extractReplyPayload = extractReplyPayload;
}

public void setMetricsCaptor(MetricsCaptor captor) {
this.gatewayDelegate.registerMetricsCaptor(captor);
}

public void setObservationRegistry(ObservationRegistry observationRegistry) {
this.gatewayDelegate.registerObservationRegistry(observationRegistry);
}

public void setRequestReplyObservationConvention(
@Nullable MessageRequestReplyReceiverObservationConvention observationConvention) {

this.gatewayDelegate.setObservationConvention(observationConvention);
}

public void setReceiverObservationConvention(
@Nullable MessageReceiverObservationConvention observationConvention) {

this.gatewayDelegate.setReceiverObservationConvention(observationConvention);
}

@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,14 @@

package org.springframework.integration.jms;

import io.micrometer.observation.ObservationRegistry;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.integration.support.management.observation.MessageRequestReplyReceiverObservationConvention;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.messaging.MessageChannel;

Expand Down Expand Up @@ -114,28 +118,38 @@ public void setShutdownContainerOnStop(boolean shutdownContainerOnStop) {
this.endpoint.setShutdownContainerOnStop(shutdownContainerOnStop);
}

@Override
public void registerMetricsCaptor(MetricsCaptor metricsCaptorToRegister) {
super.registerMetricsCaptor(metricsCaptorToRegister);
this.endpoint.registerMetricsCaptor(metricsCaptorToRegister);
}

@Override
public String getComponentType() {
return this.endpoint.getComponentType();
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
super.registerObservationRegistry(observationRegistry);
this.endpoint.registerObservationRegistry(observationRegistry);
}

@Override
public void setComponentName(String componentName) {
super.setComponentName(componentName);
this.endpoint.setComponentName(getComponentName());
public void setObservationConvention(MessageRequestReplyReceiverObservationConvention observationConvention) {
super.setObservationConvention(observationConvention);
this.endpoint.getListener().setRequestReplyObservationConvention(observationConvention);
}

@Override
public String getComponentType() {
return this.endpoint.getComponentType();
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
super.setApplicationContext(applicationContext);
this.endpoint.setApplicationContext(applicationContext);
this.endpoint.setBeanFactory(applicationContext);
this.endpoint.getListener().setBeanFactory(applicationContext);
}

@Override
protected void onInit() {
this.endpoint.setComponentName(getComponentName());
this.endpoint.afterPropertiesSet();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,11 +16,15 @@

package org.springframework.integration.jms;

import io.micrometer.observation.ObservationRegistry;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.jms.util.JmsAdapterUtils;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.integration.support.management.observation.MessageReceiverObservationConvention;
import org.springframework.jms.listener.AbstractMessageListenerContainer;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.messaging.MessageChannel;
Expand Down Expand Up @@ -91,7 +95,7 @@ private JmsMessageDrivenEndpoint(AbstractMessageListenerContainer listenerContai
* container setting even if an external container is provided. Defaults to null
* (won't change container) if an external container is provided or `transacted` when
* the framework creates an implicit {@link DefaultMessageListenerContainer}.
* @param sessionAcknowledgeMode the acknowledge mode.
* @param sessionAcknowledgeMode the acknowledgement mode.
*/
public void setSessionAcknowledgeMode(String sessionAcknowledgeMode) {
this.sessionAcknowledgeMode = sessionAcknowledgeMode;
Expand Down Expand Up @@ -134,9 +138,9 @@ public void setShouldTrack(boolean shouldTrack) {
}

/**
* Set to false to prevent listener container shutdown when the endpoint is stopped.
* Set to {@code false} to prevent listener container shutdown when the endpoint is stopped.
* Then, if so configured, any cached consumer(s) in the container will remain.
* Otherwise the shared connection and will be closed and the listener invokers shut
* Otherwise, the shared connection and will be closed and the listener invokers shut
* down; this behavior is new starting with version 5.1. Default: true.
* @param shutdownContainerOnStop false to not shutdown.
* @since 5.1
Expand All @@ -149,6 +153,24 @@ public ChannelPublishingJmsMessageListener getListener() {
return this.listener;
}

@Override
public void registerMetricsCaptor(MetricsCaptor captor) {
super.registerMetricsCaptor(captor);
this.listener.setMetricsCaptor(captor);
}

@Override
public void registerObservationRegistry(ObservationRegistry observationRegistry) {
super.registerObservationRegistry(observationRegistry);
this.listener.setObservationRegistry(observationRegistry);
}

@Override
public void setObservationConvention(MessageReceiverObservationConvention observationConvention) {
super.setObservationConvention(observationConvention);
this.listener.setReceiverObservationConvention(observationConvention);
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
super.setApplicationContext(applicationContext);
Expand Down
Loading

0 comments on commit 96bfd27

Please sign in to comment.