Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to make routing-key part of RabbitMQ transaction/span names #3636

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ Use subheadings with the "=====" level for adding notes for unreleased changes:
===== Bug fixes
* Restore compatibility with Java 7 - {pull}3657[#3657]

[float]
===== Features
* Added option to make routing-key part of RabbitMQ transaction/span names - {pull}3636[#3636]

[[release-notes-1.50.0]]
==== 1.50.0 - 2024/05/28

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import co.elastic.apm.agent.sdk.ElasticApmInstrumentation;
import co.elastic.apm.agent.tracer.ElasticContext;
import co.elastic.apm.agent.tracer.Span;
import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
Expand Down Expand Up @@ -137,10 +138,9 @@ public static Object[] onBasicPublish(@Advice.This Channel channel,
if (exitSpan != null) {

exchange = normalizeExchangeName(exchange);

String transactionNameSuffix = normalizeExchangeName(resolveTransactionNameSuffix(exchange, routingKey));
exitSpan.withAction("send")
.withName("RabbitMQ SEND to ").appendToName(exchange);

.withName("RabbitMQ SEND to ").appendToName(transactionNameSuffix);
}

properties = propagateTraceContext(tracer.currentContext(), properties);
Expand Down Expand Up @@ -174,6 +174,15 @@ private static AMQP.BasicProperties propagateTraceContext(ElasticContext<?> toPr
return properties.builder().headers(headersWithContext).build();
}

private static String resolveTransactionNameSuffix(String exchange, String routingKey) {

if (MessagingConfiguration.RabbitMQNamingMode.ROUTING_KEY == AbstractBaseInstrumentation.getRabbitMQNamingMode()) {
return routingKey;
} else {
return exchange;
}
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class, inline = false)
public static void afterBasicPublish(@Advice.Enter @Nullable Object[] enterArray,
@Advice.Thrown @Nullable Throwable throwable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import co.elastic.apm.agent.tracer.Transaction;
import co.elastic.apm.agent.rabbitmq.header.RabbitMQTextHeaderGetter;
import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration.RabbitMQNamingMode;
import co.elastic.apm.agent.tracer.metadata.Message;
import co.elastic.apm.agent.sdk.internal.util.PrivilegedActionUtils;
import com.rabbitmq.client.AMQP;
Expand Down Expand Up @@ -106,8 +107,9 @@ public static Object onHandleDelivery(@Advice.Origin Class<?> originClazz,
return null;
}

String transactionNameSuffix = getExchangeOrRoutingKey(envelope);
transaction.withType("messaging")
.withName("RabbitMQ RECEIVE from ").appendToName(normalizeExchangeName(exchange));
.withName("RabbitMQ RECEIVE from ").appendToName(normalizeExchangeName(transactionNameSuffix));
JonasKunz marked this conversation as resolved.
Show resolved Hide resolved

transaction.setFrameworkName("RabbitMQ");

Expand All @@ -129,5 +131,17 @@ public static void afterHandleDelivery(@Advice.Enter @Nullable final Object tran
.end();
}
}

private static String getExchangeOrRoutingKey(Envelope envelope) {
if (null == envelope) {
return null;
}

if (RabbitMQNamingMode.ROUTING_KEY == AbstractBaseInstrumentation.getRabbitMQNamingMode()) {
return envelope.getRoutingKey();
} else {
return envelope.getExchange();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.common.util.WildcardMatcher;
import co.elastic.apm.agent.testutils.TestContainersUtils;
import co.elastic.apm.agent.tracer.configuration.MessagingConfiguration.RabbitMQNamingMode;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
Expand Down Expand Up @@ -98,7 +99,7 @@ public class RabbitMQIT extends AbstractInstrumentationTest {
private static final String IMAGE = "rabbitmq:3.7-management-alpine";
private static final RabbitMQContainer container = new RabbitMQContainer(IMAGE);

private static final String ROUTING_KEY = "test.key";
private static final String TEST_ROUTING_KEY = "test.key";

private static final byte[] MSG = "Testing APM!".getBytes();

Expand Down Expand Up @@ -203,7 +204,8 @@ void headersCaptureSanitize() throws IOException, InterruptedException {
), true);
}

private void testHeadersCapture(Map<String, String> headersMap, Map<String, String> expectedHeaders, boolean expectTracingHeaders) throws IOException, InterruptedException {
private void testHeadersCapture(Map<String, String> headersMap, Map<String, String> expectedHeaders,
boolean expectTracingHeaders) throws IOException, InterruptedException {
performTest(
propertiesMap(headersMap),
false,
Expand All @@ -228,6 +230,15 @@ void ignoreExchangeName() throws IOException, InterruptedException {
});
}

@Test
void routingKeyInTransactionName() throws IOException, InterruptedException {
MessagingConfiguration messagingConfiguration = config.getConfig(MessagingConfiguration.class);
doReturn(RabbitMQNamingMode.ROUTING_KEY).when(messagingConfiguration).getRabbitMQNamingMode();

performTest(emptyProperties(), false, randString("exchange"), "different-routing-key",
(mt, ms) -> {});
}

private void performTest(@Nullable AMQP.BasicProperties properties) throws IOException, InterruptedException {
performTest(properties, false, randString("exchange"), (mt, ms) -> {
});
Expand All @@ -238,17 +249,27 @@ private void performTest(@Nullable AMQP.BasicProperties properties,
String channelName,
BiConsumer<Message, Message> messageCheck) throws IOException, InterruptedException {

performTest(properties, shouldIgnore, channelName, TEST_ROUTING_KEY, messageCheck);
}

private void performTest(@Nullable AMQP.BasicProperties properties,
boolean shouldIgnore,
String channelName,
String routingKey,
BiConsumer<Message, Message> messageCheck) throws IOException, InterruptedException {

Channel channel = connection.createChannel();
String exchange = createExchange(channel, channelName);
String queue = createQueue(channel, exchange);
String queue = createQueue(channel, exchange, routingKey);

CountDownLatch messageReceived = new CountDownLatch(1);

channel.basicConsume(queue, new DefaultConsumer(channel) {
// using an anonymous class to ensure class matching is properly applied

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
assertThat(properties).isNotNull();
Map<String, Object> headers = properties.getHeaders();

Expand All @@ -264,7 +285,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp

Transaction rootTransaction = startTestRootTransaction("Rabbit-Test Root Transaction");

channel.basicPublish(exchange, ROUTING_KEY, properties, MSG);
channel.basicPublish(exchange, routingKey, properties, MSG);

endRootTransaction(rootTransaction);

Expand All @@ -285,17 +306,17 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
return;
}


// 2 transactions, 1 span expected
getReporter().awaitTransactionCount(2);
getReporter().awaitSpanCount(1);

Transaction childTransaction = getNonRootTransaction(rootTransaction, getReporter().getTransactions());

checkTransaction(childTransaction, exchange);
String transactionNameSuffix = !routingKey.equals(TEST_ROUTING_KEY) ? routingKey: exchange;
checkTransaction(childTransaction, exchange, transactionNameSuffix, "RabbitMQ");

Span span = getReporter().getSpans().get(0);
checkSendSpan(span, exchange);
checkSendSpan(span, exchange, transactionNameSuffix);

// span should be child of the first transaction
checkParentChild(rootTransaction, span);
Expand All @@ -306,12 +327,11 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
Message spanMessage = span.getContext().getMessage();
Message transactionMessage = childTransaction.getContext().getMessage();


// test-specific assertions on captured message
messageCheck.accept(transactionMessage, spanMessage);

}


@Test
void testPollingWithinTransactionNoMessage() throws IOException {
Channel channel = connection.createChannel();
Expand Down Expand Up @@ -395,7 +415,7 @@ void testPollingIgnoreExchangeName() throws IOException {
private String declareAndBindQueue(String queue, String exchange, Channel channel) {
try {
channel.queueDeclare(queue, false, false, false, null);
channel.queueBind(queue, exchange, ROUTING_KEY);
channel.queueBind(queue, exchange, TEST_ROUTING_KEY);
return queue;
} catch (IOException e) {
throw new IllegalStateException(e);
Expand All @@ -413,7 +433,7 @@ private void pollingTest(boolean withinTransaction, boolean withResult, Supplier
}

if (withResult) {
channel.basicPublish(exchange, ROUTING_KEY, emptyProperties(), MSG);
channel.basicPublish(exchange, TEST_ROUTING_KEY, emptyProperties(), MSG);
}
channel.basicGet(queue, true);

Expand Down Expand Up @@ -573,9 +593,9 @@ static Transaction getNonRootTransaction(Transaction rootTransaction, List<Trans
return childTransaction;
}

private String createQueue(Channel channel, String exchange) throws IOException {
private String createQueue(Channel channel, String exchange, String routingKey) throws IOException {
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchange, ROUTING_KEY);
channel.queueBind(queueName, exchange, routingKey);
return queueName;
}

Expand Down Expand Up @@ -618,9 +638,13 @@ private static void checkTransaction(Transaction transaction, String exchange) {
}

static void checkTransaction(Transaction transaction, String exchange, String frameworkName) {
checkTransaction(transaction, exchange, exchange, frameworkName);
}

static void checkTransaction(Transaction transaction, String exchange, String transactionNameSuffix, String frameworkName) {
assertThat(transaction.getType()).isEqualTo("messaging");
assertThat(transaction.getNameAsString())
.isEqualTo("RabbitMQ RECEIVE from %s", exchange.isEmpty() ? "<default>" : exchange);
.isEqualTo("RabbitMQ RECEIVE from %s", transactionNameSuffix.isEmpty() ? "<default>" : transactionNameSuffix);
assertThat(transaction.getFrameworkName()).isEqualTo(frameworkName);

assertThat(transaction.getOutcome()).isEqualTo(Outcome.SUCCESS);
Expand Down Expand Up @@ -682,14 +706,23 @@ private static HashMap<String, String> getHeadersMap(Message message) {
}

private static void checkSendSpan(Span span, String exchange) {
checkSendSpan(span, exchange, connection.getAddress().getHostAddress(), connection.getPort());
checkSendSpan(span, exchange, exchange, connection.getAddress().getHostAddress(), connection.getPort());
}

private static void checkSendSpan(Span span, String exchange, String spanNameSuffix) {
checkSendSpan(span, exchange, spanNameSuffix, connection.getAddress().getHostAddress(), connection.getPort());
}

static void checkSendSpan(Span span, String exchange, String host, int port) {
checkSendSpan(span, exchange, exchange, host, port);
}

static void checkSendSpan(Span span, String exchange, String spanNameSuffix, String host, int port) {
String exchangeName = exchange.isEmpty() ? "<default>" : exchange;
String spanName = spanNameSuffix.isEmpty() ? "<default>" : spanNameSuffix;
checkSpanCommon(span,
"send",
String.format("RabbitMQ SEND to %s", exchangeName),
String.format("RabbitMQ SEND to %s", spanName),
exchangeName,
true
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ public class MessagingConfiguration extends ConfigurationOptionProvider {
private final ConfigurationOption<RabbitMQNamingMode> rabbitMQNamingMode = ConfigurationOption.enumOption(RabbitMQNamingMode.class)
.key("rabbitmq_naming_mode")
.configurationCategory(MESSAGING_CATEGORY)
.description("Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`.\n" +
"Note that `QUEUE` only works when using RabbitMQ via spring-amqp."
.description("Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`.\n" +
"Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client."
)
.dynamic(true)
.tags("added[1.46.0]")
Expand Down Expand Up @@ -187,5 +187,9 @@ public enum RabbitMQNamingMode {
* Use queue in transaction names
*/
QUEUE,
/**
* Use routing key in transaction names
*/
ROUTING_KEY
}
}
12 changes: 6 additions & 6 deletions docs/configuration.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2513,12 +2513,12 @@ Starting from version 1.43.0, the classes that are part of the 'application_pack
[[config-rabbitmq-naming-mode]]
==== `rabbitmq_naming_mode` (added[1.46.0])

Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`.
Note that `QUEUE` only works when using RabbitMQ via spring-amqp.
Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`.
Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client.

<<configuration-dynamic, image:./images/dynamic-config.svg[] >>

Valid options: `EXCHANGE`, `QUEUE`
Valid options: `EXCHANGE`, `QUEUE`, `ROUTING_KEY`

[options="header"]
|============
Expand Down Expand Up @@ -4703,10 +4703,10 @@ Example: `5ms`.
#
# jms_listener_packages=

# Defines whether the agent should use the exchanges or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE` and `EXCHANGE`.
# Note that `QUEUE` only works when using RabbitMQ via spring-amqp.
# Defines whether the agent should use the exchanges, the routing key or the queue for the naming of RabbitMQ Transactions. Valid options are `QUEUE`, `ROUTING_KEY` and `EXCHANGE`.
# Note that `QUEUE` only works when using RabbitMQ via spring-amqp and `ROUTING_KEY` only works for the non spring-client.
#
# Valid options: EXCHANGE, QUEUE
# Valid options: EXCHANGE, QUEUE, ROUTING_KEY
# This setting can be changed at runtime
# Type: RabbitMQNamingMode
# Default value: EXCHANGE
Expand Down
Loading