Skip to content

Commit

Permalink
feat: support kafka payloads (#419)
Browse files Browse the repository at this point in the history
feat: Rd 12974 java distro support kafka payloads (#419)
  • Loading branch information
nadav3396 authored Jun 23, 2024
1 parent 1fdfe69 commit 28e8209
Show file tree
Hide file tree
Showing 24 changed files with 513 additions and 859 deletions.
2 changes: 1 addition & 1 deletion agent/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ dependencies {
javaagentLibs(project(":instrumentation:java11-http-client:java-http-client"))
javaagentLibs(project(":instrumentation:jdbc:javaagent"))
javaagentLibs(project(":instrumentation:jedis:jedis-1.4:javaagent"))
javaagentLibs(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent"))
// TODO: smoke tests fail with these present
// javaagentLibs(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent"))
// javaagentLibs(project(":instrumentation:rabbitmq-2.7:javaagent"))
javaagentLibs(project(":instrumentation:servlet:servlet-3.0:javaagent"))
javaagentLibs(project(":instrumentation:spring:spring-webmvc:spring-webmvc-3.1:javaagent"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.lumigo.instrumentation.core;

import io.opentelemetry.api.common.AttributeKey;
import java.util.List;

/** Semantic Attribute keys specific to Lumigo. */
public final class LumigoSemanticAttributes {
Expand All @@ -30,6 +31,9 @@ public final class LumigoSemanticAttributes {
public static final AttributeKey<String> MESSAGING_PAYLOAD =
AttributeKey.stringKey("messaging.message.payload");

public static final AttributeKey<List<String>> MESSAGING_HEADERS =
AttributeKey.stringArrayKey("messaging.message.headers");

public static final AttributeKey<String> GRPC_REQUEST_BODY =
AttributeKey.stringKey("rpc.grpc.request.payload");

Expand Down
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ subprojects {
&& !it.path.contains("aws-sdk-2.2")
&& !it.path.contains("spring-webmvc-3.1")
&& !it.path.contains("rabbitmq-2.7")
&& !it.path.contains("kafka-clients-0.11")
&& !it.path.contains("lettuce-5.1")
) {
apply plugin: "com.diffplug.spotless"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,40 +1,23 @@
# Background on fork
# Background on instrumentation

This module is a fork of the upstream [Kafka client instrumentation](https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent).
This fork adds `ConsumerPayloadAttributeExtractor` into the list of extractors used in
`KafkaSingletons.CONSUMER_PROCESS_INSTRUMENTER` to facilitate collection of message payloads on a span.
This module adds kafka messages payloads to the spans generated by the upstream [Kafka client instrumentation](https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent).

This fork is necessary as the byte code advice directly accesses a static field on `KafkaSingletons`
containing the instrumenter. There is no mechanism to extend the `InstrumenterBuilder` to replace
aspects of what it provides, which is why this approach has been taken.
## How it works

Every update to OpenTelemetry JAVA SDK base version will require:
- A re-copy of instrumentation changes from the [javaagent module](https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent)
- Remove `KafkaClientsInstrumentationModule` from the copied instrumentation
- Re-application of the following changes:
We utilize two instrumentation modules because one needs to run before the OTEL instrumentation and the other after.

## `KafkaSingletons`
### Adding Producer instrumentation payloads

Update the `static` block to use the below for creating the `CONSUMER_PROCESS_INSTRUMENTER`:

```java
// Add Lumigo custom payload extractor
CONSUMER_PROCESS_INSTRUMENTER =
instrumenterFactory.createConsumerOperationInstrumenter(
MessageOperation.PROCESS, Collections.singletonList(new ConsumerPayloadAttributeExtractor()));
```
Similar to the Otel producer instrumentation, we instrument the `send` method of `KafkaProducer` class.
Then run after the Otel intrumentation and add the payload to the span attributes.

## `KafkaProducerInstrumentation`
### Adding Consumer instrumentation payloads

Modify `SendAdvice.onMethodEnter()` to include the below before creating producer callback:
The OTEL consumer instrumentation creates consume spans only when the user iterates over the records.
It does this by wrapping the iterator of the ConsumerRecords class.
Our payload instrumentation works similarly by wrapping the OTEL iterator with our own iterator,
and then adding the payloads to the span attributes generated when the user iterates over the records.

```java
if (null != record.value()) {
Java8BytecodeBridge.currentSpan()
.setAttribute(SemanticAttributes.MESSAGING_PAYLOAD, record.value().toString());
}
```
## Maintenance

## `io.lumigo.javaagent.instrumentation.kafkaclients.v0_11.KafkaClientInstrumentationModule`

Ensure `typeInstrumentations()` includes all instrumentations from the upstream module.
No special maintenance is required for this module.
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ dependencies {
compileOnly project(":bootstrap")

compileOnly("org.apache.kafka:kafka-clients:${versions.kafkaclients011}")

compileOnly("io.opentelemetry.javaagent.instrumentation:opentelemetry-javaagent-kafka-clients-0.11-bootstrap:${versions.opentelemetryJavaagentAlpha}")
compileOnly("io.opentelemetry.instrumentation:opentelemetry-kafka-clients-common:${versions.opentelemetryJavaagentAlpha}")
compileOnly("io.opentelemetry.javaagent.instrumentation:opentelemetry-javaagent-kafka-clients-0.11:${versions.opentelemetryJavaagentAlpha}")

implementation("io.opentelemetry.instrumentation:opentelemetry-kafka-clients-common:${versions.opentelemetryJavaagentAlpha}")
// Instrumentation for testing
testInstrumentation("io.opentelemetry.javaagent.instrumentation:opentelemetry-javaagent-kafka-clients-0.11:${versions.opentelemetryJavaagentAlpha}")

testImplementation("org.apache.kafka:kafka-clients:${versions.kafkaclients011}")
testImplementation("io.opentelemetry.javaagent.instrumentation:opentelemetry-javaagent-kafka-clients-0.11-bootstrap:${versions.opentelemetryJavaagentAlpha}")
testImplementation("org.testcontainers:kafka:${versions.testcontainers}")
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2024 Lumigo LTD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package io.lumigo.javaagent.instrumentation.kafkaclients.v0_11;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Iterator;
import java.util.List;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ConsumerRecordsPayloadInstrumentation implements TypeInstrumentation {

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.kafka.clients.consumer.ConsumerRecords");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("records"))
.and(takesArgument(0, String.class))
.and(returns(Iterable.class)),
ConsumerRecordsPayloadInstrumentation.class.getName() + "$IterableAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("records"))
.and(takesArgument(0, named("org.apache.kafka.common.TopicPartition")))
.and(returns(List.class)),
ConsumerRecordsPayloadInstrumentation.class.getName() + "$ListAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(named("iterator"))
.and(takesArguments(0))
.and(returns(Iterator.class)),
ConsumerRecordsPayloadInstrumentation.class.getName() + "$IteratorAdvice");
}

@SuppressWarnings("unused")
public static class IterableAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static <K, V> void wrap(
@Advice.Return(readOnly = false) Iterable<ConsumerRecord<K, V>> iterable) {
iterable = PayloadTracingIterable.wrap(iterable);
}
}

@SuppressWarnings("unused")
public static class ListAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static <K, V> void wrap(
@Advice.Return(readOnly = false) List<ConsumerRecord<K, V>> list) {
list = PayloadTracingList.wrap(list);
}
}

@SuppressWarnings("unused")
public static class IteratorAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static <K, V> void wrap(
@Advice.Return(readOnly = false) Iterator<ConsumerRecord<K, V>> iterator) {
iterator = PayloadTracingIterator.wrap(iterator);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,19 @@ public class KafkaClientConfigCustomizer implements AutoConfigurationCustomizerP
private static final String KAFKA_PRODUCER_PROPAGATION_ATTRIBUTE_KEY =
"otel.instrumentation.kafka.producer-propagation.enabled";

private static final String MESSAGING_EXPERIMENTAL_RECEIVE_TELEMETRY_KEY =
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled";

@Override
public void customize(AutoConfigurationCustomizer autoConfiguration) {
autoConfiguration.addPropertiesCustomizer(
config -> {
Map<String, String> overrides = new HashMap<>();

// disable OTeL instrumentation for Kafka Clients
overrides.put("otel.instrumentation.kafka-clients.enabled", "false");

// If not set by user, set to true
if (null == config.getBoolean(KAFKA_EXPERIMENTAL_ATTRIBUTES_KEY)) {
overrides.put(KAFKA_EXPERIMENTAL_ATTRIBUTES_KEY, "true");
}
if (null == config.getBoolean(KAFKA_PRODUCER_PROPAGATION_ATTRIBUTE_KEY)) {
overrides.put(KAFKA_PRODUCER_PROPAGATION_ATTRIBUTE_KEY, "true");
}
if (null == config.getBoolean(MESSAGING_EXPERIMENTAL_RECEIVE_TELEMETRY_KEY)) {
overrides.put(MESSAGING_EXPERIMENTAL_RECEIVE_TELEMETRY_KEY, "true");
}

return overrides;
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2023 Lumigo LTD
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package io.lumigo.javaagent.instrumentation.kafkaclients.v0_11;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import java.util.Collections;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class KafkaClientConsumerRecordsInstrumentationModule extends InstrumentationModule {
public KafkaClientConsumerRecordsInstrumentationModule() {
super(
"lumigo-kafka-clients-consumer-payloads",
"lumigo-kafka-clients-0.11",
"lumigo-kafka",
"lumigo-kafka-consumer-payload");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Collections.singletonList(new ConsumerRecordsPayloadInstrumentation());
}

@Override
public boolean isHelperClass(String className) {
return className.startsWith("io.lumigo.javaagent.instrumentation.kafkaclients.v0_11.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,32 @@
import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.ConsumerRecordsInstrumentation;
import io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaConsumerInstrumentation;
import io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaProducerInstrumentation;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

@AutoService(InstrumentationModule.class)
public class KafkaClientInstrumentationModule extends InstrumentationModule {
public KafkaClientInstrumentationModule() {
super("lumigo-kafka-clients", "lumigo-kafka-clients-0.11", "lumigo-kafka");
}

@Override
public boolean isIndyModule() {
// OpenTelemetryMetricsReporter is not available in app class loader
return false;
super(
"lumigo-kafka-clients-producer-payloads",
"lumigo-kafka-clients-0.11",
"lumigo-kafka",
"lumigo-kafka-producer-payload");
}

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return Arrays.asList(
// Original OTeL instrumentation
new KafkaProducerInstrumentation(),
new KafkaConsumerInstrumentation(),
new ConsumerRecordsInstrumentation()
);
return Collections.singletonList(new KafkaProducerPayloadInstrumentation());
}

@Override
public boolean isHelperClass(String className) {
return className.startsWith("io.lumigo.javaagent.instrumentation.kafkaclients.v0_11.");
}

@Override
public int order() {
// Run after OTeL kafka Instrumentation
return 1;
}
}
Loading

0 comments on commit 28e8209

Please sign in to comment.