diff --git a/agent/build.gradle b/agent/build.gradle index ed63f72df5..7252cd5128 100644 --- a/agent/build.gradle +++ b/agent/build.gradle @@ -37,8 +37,8 @@ dependencies { javaagentLibs(project(":instrumentation:java11-http-client:java-http-client")) javaagentLibs(project(":instrumentation:jdbc:javaagent")) javaagentLibs(project(":instrumentation:jedis:jedis-1.4:javaagent")) + javaagentLibs(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent")) // TODO: smoke tests fail with these present -// javaagentLibs(project(":instrumentation:kafka:kafka-clients:kafka-clients-0.11:javaagent")) // javaagentLibs(project(":instrumentation:rabbitmq-2.7:javaagent")) javaagentLibs(project(":instrumentation:servlet:servlet-3.0:javaagent")) javaagentLibs(project(":instrumentation:spring:spring-webmvc:spring-webmvc-3.1:javaagent")) diff --git a/bootstrap/src/main/java/io/lumigo/instrumentation/core/LumigoSemanticAttributes.java b/bootstrap/src/main/java/io/lumigo/instrumentation/core/LumigoSemanticAttributes.java index 9d68cdf931..86e8a0ff2d 100644 --- a/bootstrap/src/main/java/io/lumigo/instrumentation/core/LumigoSemanticAttributes.java +++ b/bootstrap/src/main/java/io/lumigo/instrumentation/core/LumigoSemanticAttributes.java @@ -18,6 +18,7 @@ package io.lumigo.instrumentation.core; import io.opentelemetry.api.common.AttributeKey; +import java.util.List; /** Semantic Attribute keys specific to Lumigo. */ public final class LumigoSemanticAttributes { @@ -30,6 +31,9 @@ public final class LumigoSemanticAttributes { public static final AttributeKey MESSAGING_PAYLOAD = AttributeKey.stringKey("messaging.message.payload"); + public static final AttributeKey> MESSAGING_HEADERS = + AttributeKey.stringArrayKey("messaging.message.headers"); + public static final AttributeKey GRPC_REQUEST_BODY = AttributeKey.stringKey("rpc.grpc.request.payload"); diff --git a/build.gradle b/build.gradle index ae7e16d1a9..1bd4e5b7c9 100644 --- a/build.gradle +++ b/build.gradle @@ -28,7 +28,6 @@ subprojects { && !it.path.contains("aws-sdk-2.2") && !it.path.contains("spring-webmvc-3.1") && !it.path.contains("rabbitmq-2.7") - && !it.path.contains("kafka-clients-0.11") && !it.path.contains("lettuce-5.1") ) { apply plugin: "com.diffplug.spotless" diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/README.md b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/README.md index 4bd50fd270..045a6efecb 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/README.md +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/README.md @@ -1,40 +1,23 @@ -# Background on fork +# Background on instrumentation -This module is a fork of the upstream [Kafka client instrumentation](https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent). -This fork adds `ConsumerPayloadAttributeExtractor` into the list of extractors used in -`KafkaSingletons.CONSUMER_PROCESS_INSTRUMENTER` to facilitate collection of message payloads on a span. +This module adds kafka messages payloads to the spans generated by the upstream [Kafka client instrumentation](https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent). -This fork is necessary as the byte code advice directly accesses a static field on `KafkaSingletons` -containing the instrumenter. There is no mechanism to extend the `InstrumenterBuilder` to replace -aspects of what it provides, which is why this approach has been taken. +## How it works -Every update to OpenTelemetry JAVA SDK base version will require: -- A re-copy of instrumentation changes from the [javaagent module](https://github.com/open-telemetry/opentelemetry-java-instrumentation/tree/main/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent) -- Remove `KafkaClientsInstrumentationModule` from the copied instrumentation -- Re-application of the following changes: +We utilize two instrumentation modules because one needs to run before the OTEL instrumentation and the other after. -## `KafkaSingletons` +### Adding Producer instrumentation payloads -Update the `static` block to use the below for creating the `CONSUMER_PROCESS_INSTRUMENTER`: - -```java -// Add Lumigo custom payload extractor -CONSUMER_PROCESS_INSTRUMENTER = - instrumenterFactory.createConsumerOperationInstrumenter( - MessageOperation.PROCESS, Collections.singletonList(new ConsumerPayloadAttributeExtractor())); -``` +Similar to the Otel producer instrumentation, we instrument the `send` method of `KafkaProducer` class. +Then run after the Otel intrumentation and add the payload to the span attributes. -## `KafkaProducerInstrumentation` +### Adding Consumer instrumentation payloads -Modify `SendAdvice.onMethodEnter()` to include the below before creating producer callback: +The OTEL consumer instrumentation creates consume spans only when the user iterates over the records. +It does this by wrapping the iterator of the ConsumerRecords class. +Our payload instrumentation works similarly by wrapping the OTEL iterator with our own iterator, +and then adding the payloads to the span attributes generated when the user iterates over the records. -```java -if (null != record.value()) { - Java8BytecodeBridge.currentSpan() - .setAttribute(SemanticAttributes.MESSAGING_PAYLOAD, record.value().toString()); -} -``` +## Maintenance -## `io.lumigo.javaagent.instrumentation.kafkaclients.v0_11.KafkaClientInstrumentationModule` - -Ensure `typeInstrumentations()` includes all instrumentations from the upstream module. +No special maintenance is required for this module. diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/build.gradle b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/build.gradle index 51ed199d40..c3fe6b732c 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/build.gradle +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/build.gradle @@ -13,12 +13,13 @@ dependencies { compileOnly project(":bootstrap") compileOnly("org.apache.kafka:kafka-clients:${versions.kafkaclients011}") - compileOnly("io.opentelemetry.javaagent.instrumentation:opentelemetry-javaagent-kafka-clients-0.11-bootstrap:${versions.opentelemetryJavaagentAlpha}") + compileOnly("io.opentelemetry.instrumentation:opentelemetry-kafka-clients-common:${versions.opentelemetryJavaagentAlpha}") + compileOnly("io.opentelemetry.javaagent.instrumentation:opentelemetry-javaagent-kafka-clients-0.11:${versions.opentelemetryJavaagentAlpha}") - implementation("io.opentelemetry.instrumentation:opentelemetry-kafka-clients-common:${versions.opentelemetryJavaagentAlpha}") +// Instrumentation for testing + testInstrumentation("io.opentelemetry.javaagent.instrumentation:opentelemetry-javaagent-kafka-clients-0.11:${versions.opentelemetryJavaagentAlpha}") testImplementation("org.apache.kafka:kafka-clients:${versions.kafkaclients011}") - testImplementation("io.opentelemetry.javaagent.instrumentation:opentelemetry-javaagent-kafka-clients-0.11-bootstrap:${versions.opentelemetryJavaagentAlpha}") testImplementation("org.testcontainers:kafka:${versions.testcontainers}") } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/ConsumerPayloadAttributeExtractor.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/ConsumerPayloadAttributeExtractor.java deleted file mode 100644 index 8515f326ee..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/ConsumerPayloadAttributeExtractor.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright 2023 Lumigo LTD - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package io.lumigo.javaagent.instrumentation.kafkaclients.v0_11; - -import io.lumigo.instrumentation.core.LumigoSemanticAttributes; -import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; -import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; -import javax.annotation.Nullable; - -public class ConsumerPayloadAttributeExtractor implements AttributesExtractor { - @Override - public void onStart(AttributesBuilder attributes, Context parentContext, - KafkaProcessRequest kafkaProcessRequest) { - if (null != kafkaProcessRequest.getRecord().value()) { - attributes.put( - LumigoSemanticAttributes.MESSAGING_PAYLOAD, - kafkaProcessRequest.getRecord().value().toString()); - } - } - - @Override - public void onEnd(AttributesBuilder attributes, Context context, - KafkaProcessRequest kafkaProcessRequest, @Nullable Void unused, @Nullable Throwable error) { - // Do nothing - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsPayloadInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsPayloadInstrumentation.java new file mode 100644 index 0000000000..f3b7d7fee9 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsPayloadInstrumentation.java @@ -0,0 +1,94 @@ +/* + * Copyright 2024 Lumigo LTD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package io.lumigo.javaagent.instrumentation.kafkaclients.v0_11; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import java.util.Iterator; +import java.util.List; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public class ConsumerRecordsPayloadInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.kafka.clients.consumer.ConsumerRecords"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod() + .and(isPublic()) + .and(named("records")) + .and(takesArgument(0, String.class)) + .and(returns(Iterable.class)), + ConsumerRecordsPayloadInstrumentation.class.getName() + "$IterableAdvice"); + transformer.applyAdviceToMethod( + isMethod() + .and(isPublic()) + .and(named("records")) + .and(takesArgument(0, named("org.apache.kafka.common.TopicPartition"))) + .and(returns(List.class)), + ConsumerRecordsPayloadInstrumentation.class.getName() + "$ListAdvice"); + transformer.applyAdviceToMethod( + isMethod() + .and(isPublic()) + .and(named("iterator")) + .and(takesArguments(0)) + .and(returns(Iterator.class)), + ConsumerRecordsPayloadInstrumentation.class.getName() + "$IteratorAdvice"); + } + + @SuppressWarnings("unused") + public static class IterableAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap( + @Advice.Return(readOnly = false) Iterable> iterable) { + iterable = PayloadTracingIterable.wrap(iterable); + } + } + + @SuppressWarnings("unused") + public static class ListAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap( + @Advice.Return(readOnly = false) List> list) { + list = PayloadTracingList.wrap(list); + } + } + + @SuppressWarnings("unused") + public static class IteratorAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void wrap( + @Advice.Return(readOnly = false) Iterator> iterator) { + iterator = PayloadTracingIterator.wrap(iterator); + } + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientConfigCustomizer.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientConfigCustomizer.java index 2e2f9e81ae..c1e064d651 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientConfigCustomizer.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientConfigCustomizer.java @@ -31,18 +31,12 @@ public class KafkaClientConfigCustomizer implements AutoConfigurationCustomizerP private static final String KAFKA_PRODUCER_PROPAGATION_ATTRIBUTE_KEY = "otel.instrumentation.kafka.producer-propagation.enabled"; - private static final String MESSAGING_EXPERIMENTAL_RECEIVE_TELEMETRY_KEY = - "otel.instrumentation.messaging.experimental.receive-telemetry.enabled"; - @Override public void customize(AutoConfigurationCustomizer autoConfiguration) { autoConfiguration.addPropertiesCustomizer( config -> { Map overrides = new HashMap<>(); - // disable OTeL instrumentation for Kafka Clients - overrides.put("otel.instrumentation.kafka-clients.enabled", "false"); - // If not set by user, set to true if (null == config.getBoolean(KAFKA_EXPERIMENTAL_ATTRIBUTES_KEY)) { overrides.put(KAFKA_EXPERIMENTAL_ATTRIBUTES_KEY, "true"); @@ -50,9 +44,6 @@ public void customize(AutoConfigurationCustomizer autoConfiguration) { if (null == config.getBoolean(KAFKA_PRODUCER_PROPAGATION_ATTRIBUTE_KEY)) { overrides.put(KAFKA_PRODUCER_PROPAGATION_ATTRIBUTE_KEY, "true"); } - if (null == config.getBoolean(MESSAGING_EXPERIMENTAL_RECEIVE_TELEMETRY_KEY)) { - overrides.put(MESSAGING_EXPERIMENTAL_RECEIVE_TELEMETRY_KEY, "true"); - } return overrides; }); diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientConsumerRecordsInstrumentationModule.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientConsumerRecordsInstrumentationModule.java new file mode 100644 index 0000000000..2671072c7e --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientConsumerRecordsInstrumentationModule.java @@ -0,0 +1,45 @@ +/* + * Copyright 2023 Lumigo LTD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package io.lumigo.javaagent.instrumentation.kafkaclients.v0_11; + +import com.google.auto.service.AutoService; +import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import java.util.Collections; +import java.util.List; + +@AutoService(InstrumentationModule.class) +public class KafkaClientConsumerRecordsInstrumentationModule extends InstrumentationModule { + public KafkaClientConsumerRecordsInstrumentationModule() { + super( + "lumigo-kafka-clients-consumer-payloads", + "lumigo-kafka-clients-0.11", + "lumigo-kafka", + "lumigo-kafka-consumer-payload"); + } + + @Override + public List typeInstrumentations() { + return Collections.singletonList(new ConsumerRecordsPayloadInstrumentation()); + } + + @Override + public boolean isHelperClass(String className) { + return className.startsWith("io.lumigo.javaagent.instrumentation.kafkaclients.v0_11."); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientInstrumentationModule.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientInstrumentationModule.java index cd1bb0f56d..32f4f92bcb 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientInstrumentationModule.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientInstrumentationModule.java @@ -20,36 +20,32 @@ import com.google.auto.service.AutoService; import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule; import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; -import io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.ConsumerRecordsInstrumentation; -import io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaConsumerInstrumentation; -import io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaProducerInstrumentation; -import java.util.Arrays; +import java.util.Collections; import java.util.List; @AutoService(InstrumentationModule.class) public class KafkaClientInstrumentationModule extends InstrumentationModule { public KafkaClientInstrumentationModule() { - super("lumigo-kafka-clients", "lumigo-kafka-clients-0.11", "lumigo-kafka"); - } - - @Override - public boolean isIndyModule() { - // OpenTelemetryMetricsReporter is not available in app class loader - return false; + super( + "lumigo-kafka-clients-producer-payloads", + "lumigo-kafka-clients-0.11", + "lumigo-kafka", + "lumigo-kafka-producer-payload"); } @Override public List typeInstrumentations() { - return Arrays.asList( - // Original OTeL instrumentation - new KafkaProducerInstrumentation(), - new KafkaConsumerInstrumentation(), - new ConsumerRecordsInstrumentation() - ); + return Collections.singletonList(new KafkaProducerPayloadInstrumentation()); } @Override public boolean isHelperClass(String className) { return className.startsWith("io.lumigo.javaagent.instrumentation.kafkaclients.v0_11."); } + + @Override + public int order() { + // Run after OTeL kafka Instrumentation + return 1; + } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerPayloadInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerPayloadInstrumentation.java new file mode 100644 index 0000000000..7446087be6 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerPayloadInstrumentation.java @@ -0,0 +1,66 @@ +/* + * Copyright 2024 Lumigo LTD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package io.lumigo.javaagent.instrumentation.kafkaclients.v0_11; + +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import io.lumigo.instrumentation.core.LumigoSemanticAttributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; +import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; +import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.kafka.clients.producer.ProducerRecord; + +public class KafkaProducerPayloadInstrumentation implements TypeInstrumentation { + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.kafka.clients.producer.KafkaProducer"); + } + + @Override + public void transform(TypeTransformer transformer) { + transformer.applyAdviceToMethod( + isMethod() + .and(isPublic()) + .and(named("send")) + .and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord"))) + .and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))), + KafkaProducerPayloadInstrumentation.class.getName() + "$SendPayloadAdvice"); + } + + @SuppressWarnings("unused") + public static class SendPayloadAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.Argument(value = 0, readOnly = false) ProducerRecord record) { + if (null != record.value()) { + Span span = Java8BytecodeBridge.currentSpan(); + span.setAttribute(LumigoSemanticAttributes.MESSAGING_PAYLOAD, record.value().toString()); + span.setAttribute( + LumigoSemanticAttributes.MESSAGING_HEADERS, + KafkaUtils.convertHeadersToString(record.headers())); + } + } + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaUtils.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaUtils.java new file mode 100644 index 0000000000..d78e41451c --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaUtils.java @@ -0,0 +1,32 @@ +/* + * Copyright 2024 Lumigo LTD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package io.lumigo.javaagent.instrumentation.kafkaclients.v0_11; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.kafka.common.header.Headers; + +public class KafkaUtils { + + public static List convertHeadersToString(Headers headers) { + return Arrays.stream(headers.toArray()) + .map(header -> header.key() + "=" + Arrays.toString(header.value())) + .collect(Collectors.toList()); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/PayloadTracingIterable.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/PayloadTracingIterable.java new file mode 100644 index 0000000000..9e155af90d --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/PayloadTracingIterable.java @@ -0,0 +1,55 @@ +/* + * Copyright 2024 Lumigo LTD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package io.lumigo.javaagent.instrumentation.kafkaclients.v0_11; + +import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; +import java.util.Iterator; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public class PayloadTracingIterable implements Iterable> { + private final Iterable> delegate; + private boolean firstIterator = true; + + protected PayloadTracingIterable(Iterable> delegate) { + this.delegate = delegate; + } + + public static Iterable> wrap( + Iterable> delegate) { + if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) { + return new PayloadTracingIterable<>(delegate); + } + return delegate; + } + + @Override + public Iterator> iterator() { + Iterator> it; + // We should only return one iterator with tracing. + // However, this is not thread-safe, but usually the first (hopefully only) traversal of + // ConsumerRecords is performed in the same thread that called poll() + if (firstIterator) { + it = PayloadTracingIterator.wrap(delegate.iterator()); + firstIterator = false; + } else { + it = delegate.iterator(); + } + + return it; + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/PayloadTracingIterator.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/PayloadTracingIterator.java new file mode 100644 index 0000000000..0522cdcd30 --- /dev/null +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/PayloadTracingIterator.java @@ -0,0 +1,78 @@ +/* + * Copyright 2024 Lumigo LTD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package io.lumigo.javaagent.instrumentation.kafkaclients.v0_11; + +import io.lumigo.instrumentation.core.LumigoSemanticAttributes; +import io.opentelemetry.api.trace.Span; +import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; +import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; +import io.opentelemetry.semconv.SemanticAttributes; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; + +public class PayloadTracingIterator implements Iterator> { + + private static final String LUMIGO_MESSAGE_ID_KEY = "lumigoMessageId"; + + private final Iterator> delegateIterator; + + private PayloadTracingIterator(Iterator> delegateIterator) { + this.delegateIterator = delegateIterator; + } + + public static Iterator> wrap( + Iterator> delegateIterator) { + if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) { + return new PayloadTracingIterator<>(delegateIterator); + } + return delegateIterator; + } + + @Override + public boolean hasNext() { + return delegateIterator.hasNext(); + } + + @Override + public ConsumerRecord next() { + ConsumerRecord next = delegateIterator.next(); + if (next != null && KafkaClientsConsumerProcessTracing.wrappingEnabled()) { + Span span = Java8BytecodeBridge.currentSpan(); + span.setAttribute(LumigoSemanticAttributes.MESSAGING_PAYLOAD, next.value().toString()); + span.setAttribute( + LumigoSemanticAttributes.MESSAGING_HEADERS, + KafkaUtils.convertHeadersToString(next.headers())); + + // Support for lumigo lambda tracer kafka instrumentation + Header messageIdHeader = next.headers().lastHeader(LUMIGO_MESSAGE_ID_KEY); + if (messageIdHeader != null) { + span.setAttribute( + SemanticAttributes.MESSAGING_MESSAGE_ID, + new String(messageIdHeader.value(), StandardCharsets.UTF_8)); + } + } + return next; + } + + @Override + public void remove() { + delegateIterator.remove(); + } +} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingList.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/PayloadTracingList.java similarity index 68% rename from instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingList.java rename to instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/PayloadTracingList.java index b7cc979da0..4f26553d06 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingList.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/PayloadTracingList.java @@ -1,30 +1,40 @@ /* - * Copyright The OpenTelemetry Authors + * Copyright 2024 Lumigo LTD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * * SPDX-License-Identifier: Apache-2.0 */ +package io.lumigo.javaagent.instrumentation.kafkaclients.v0_11; -package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; - -import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; -import org.apache.kafka.clients.consumer.ConsumerRecord; - import java.util.Collection; import java.util.List; import java.util.ListIterator; +import org.apache.kafka.clients.consumer.ConsumerRecord; -public class TracingList extends TracingIterable implements List> { +public class PayloadTracingList extends PayloadTracingIterable + implements List> { private final List> delegate; - private TracingList(List> delegate, KafkaConsumerContext consumerContext) { - super(delegate, consumerContext); + private PayloadTracingList(List> delegate) { + super(delegate); this.delegate = delegate; } - public static List> wrap( - List> delegate, KafkaConsumerContext consumerContext) { + public static List> wrap(List> delegate) { if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) { - return new TracingList<>(delegate, consumerContext); + return new PayloadTracingList<>(delegate); } return delegate; } @@ -106,7 +116,6 @@ public void clear() { @Override public ConsumerRecord get(int index) { - // TODO: should this be instrumented as well? return delegate.get(index); } @@ -127,24 +136,16 @@ public int lastIndexOf(Object o) { @Override public ListIterator> listIterator() { - // TODO: the API for ListIterator is not really good to instrument it in context of Kafka - // Consumer so we will not do that for now return delegate.listIterator(); } @Override public ListIterator> listIterator(int index) { - // TODO: the API for ListIterator is not really good to instrument it in context of Kafka - // Consumer so we will not do that for now return delegate.listIterator(index); } @Override public List> subList(int fromIndex, int toIndex) { - // TODO: the API for subList is not really good to instrument it in context of Kafka - // Consumer so we will not do that for now - // Kafka is essentially a sequential commit log. We should only enable tracing when traversing - // sequentially with an iterator return delegate.subList(fromIndex, toIndex); } } diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java deleted file mode 100644 index 5ba44989be..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ConsumerRecordsInstrumentation.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; - -import static net.bytebuddy.matcher.ElementMatchers.isMethod; -import static net.bytebuddy.matcher.ElementMatchers.isPublic; -import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.returns; -import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import static net.bytebuddy.matcher.ElementMatchers.takesArguments; - -import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; -import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; -import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; -import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import java.util.Iterator; -import java.util.List; -import net.bytebuddy.asm.Advice; -import net.bytebuddy.description.type.TypeDescription; -import net.bytebuddy.matcher.ElementMatcher; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; - -public class ConsumerRecordsInstrumentation implements TypeInstrumentation { - - @Override - public ElementMatcher typeMatcher() { - return named("org.apache.kafka.clients.consumer.ConsumerRecords"); - } - - @Override - public void transform(TypeTransformer transformer) { - transformer.applyAdviceToMethod( - isMethod() - .and(isPublic()) - .and(named("records")) - .and(takesArgument(0, String.class)) - .and(returns(Iterable.class)), - ConsumerRecordsInstrumentation.class.getName() + "$IterableAdvice"); - transformer.applyAdviceToMethod( - isMethod() - .and(isPublic()) - .and(named("records")) - .and(takesArgument(0, named("org.apache.kafka.common.TopicPartition"))) - .and(returns(List.class)), - ConsumerRecordsInstrumentation.class.getName() + "$ListAdvice"); - transformer.applyAdviceToMethod( - isMethod() - .and(isPublic()) - .and(named("iterator")) - .and(takesArguments(0)) - .and(returns(Iterator.class)), - ConsumerRecordsInstrumentation.class.getName() + "$IteratorAdvice"); - } - - @SuppressWarnings("unused") - public static class IterableAdvice { - - @SuppressWarnings("unchecked") - @Advice.OnMethodExit(suppress = Throwable.class) - public static void wrap( - @Advice.This ConsumerRecords records, - @Advice.Return(readOnly = false) Iterable> iterable) { - - // it's important not to suppress consumer span creation here because this instrumentation can - // leak the context and so there may be a leaked consumer span in the context, in which - // case it's important to overwrite the leaked span instead of suppressing the correct span - // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) - KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); - iterable = TracingIterable.wrap(iterable, consumerContext); - } - } - - @SuppressWarnings("unused") - public static class ListAdvice { - - @SuppressWarnings("unchecked") - @Advice.OnMethodExit(suppress = Throwable.class) - public static void wrap( - @Advice.This ConsumerRecords records, - @Advice.Return(readOnly = false) List> list) { - - // it's important not to suppress consumer span creation here because this instrumentation can - // leak the context and so there may be a leaked consumer span in the context, in which - // case it's important to overwrite the leaked span instead of suppressing the correct span - // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) - KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); - list = TracingList.wrap(list, consumerContext); - } - } - - @SuppressWarnings("unused") - public static class IteratorAdvice { - - @SuppressWarnings("unchecked") - @Advice.OnMethodExit(suppress = Throwable.class) - public static void wrap( - @Advice.This ConsumerRecords records, - @Advice.Return(readOnly = false) Iterator> iterator) { - - // it's important not to suppress consumer span creation here because this instrumentation can - // leak the context and so there may be a leaked consumer span in the context, in which - // case it's important to overwrite the leaked span instead of suppressing the correct span - // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) - KafkaConsumerContext consumerContext = KafkaConsumerContextUtil.get(records); - iterator = TracingIterator.wrap(iterator, consumerContext); - } - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java deleted file mode 100644 index 9a9cee2c8f..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaConsumerInstrumentation.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; - -import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.consumerReceiveInstrumenter; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.enhanceConfig; -import static net.bytebuddy.matcher.ElementMatchers.isConstructor; -import static net.bytebuddy.matcher.ElementMatchers.isPublic; -import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.returns; -import static net.bytebuddy.matcher.ElementMatchers.takesArgument; -import static net.bytebuddy.matcher.ElementMatchers.takesArguments; - -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; -import io.opentelemetry.instrumentation.api.internal.Timer; -import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContextUtil; -import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest; -import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; -import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; -import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import net.bytebuddy.asm.Advice; -import net.bytebuddy.description.type.TypeDescription; -import net.bytebuddy.matcher.ElementMatcher; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; - -public class KafkaConsumerInstrumentation implements TypeInstrumentation { - - @Override - public ElementMatcher typeMatcher() { - return named("org.apache.kafka.clients.consumer.KafkaConsumer"); - } - - @Override - public void transform(TypeTransformer transformer) { - transformer.applyAdviceToMethod( - isConstructor().and(takesArgument(0, Map.class)), - this.getClass().getName() + "$ConstructorMapAdvice"); - transformer.applyAdviceToMethod( - isConstructor().and(takesArgument(0, Properties.class)), - this.getClass().getName() + "$ConstructorPropertiesAdvice"); - transformer.applyAdviceToMethod( - named("poll") - .and(isPublic()) - .and(takesArguments(1)) - .and(takesArgument(0, long.class).or(takesArgument(0, Duration.class))) - .and(returns(named("org.apache.kafka.clients.consumer.ConsumerRecords"))), - this.getClass().getName() + "$PollAdvice"); - } - - @SuppressWarnings("unused") - public static class ConstructorMapAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Map config) { - // ensure config is a mutable map - if (config.getClass() != HashMap.class) { - config = new HashMap<>(config); - } - enhanceConfig(config); - } - } - - @SuppressWarnings("unused") - public static class ConstructorPropertiesAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter(@Advice.Argument(0) Properties config) { - enhanceConfig(config); - } - } - - @SuppressWarnings("unused") - public static class PollAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class) - public static Timer onEnter() { - return Timer.start(); - } - - @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) - public static void onExit( - @Advice.Enter Timer timer, - @Advice.This Consumer consumer, - @Advice.Return ConsumerRecords records, - @Advice.Thrown Throwable error) { - - // don't create spans when no records were received - if (records == null || records.isEmpty()) { - return; - } - - Context parentContext = currentContext(); - KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumer); - - // disable process tracing and store the receive span for each individual record too - boolean previousValue = KafkaClientsConsumerProcessTracing.setEnabled(false); - try { - Context context = null; - if (consumerReceiveInstrumenter().shouldStart(parentContext, request)) { - context = - InstrumenterUtil.startAndEnd( - consumerReceiveInstrumenter(), - parentContext, - request, - null, - error, - timer.startTime(), - timer.now()); - } - - // we're storing the context of the receive span so that process spans can use it as - // parent context even though the span has ended - // this is the suggested behavior according to the spec batch receive scenario: - // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving - // we're attaching the consumer to the records to be able to retrieve things like consumer - // group or clientId later - KafkaConsumerContextUtil.set(records, context, consumer); - - for (ConsumerRecord record : records) { - KafkaConsumerContextUtil.set(record, context, consumer); - } - } finally { - KafkaClientsConsumerProcessTracing.setEnabled(previousValue); - } - } - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java deleted file mode 100644 index 5db1bcac39..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaProducerInstrumentation.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; - -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.enhanceConfig; -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.producerInstrumenter; -import static net.bytebuddy.matcher.ElementMatchers.isConstructor; -import static net.bytebuddy.matcher.ElementMatchers.isMethod; -import static net.bytebuddy.matcher.ElementMatchers.isPublic; -import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.takesArgument; - -import io.lumigo.instrumentation.core.LumigoSemanticAttributes; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest; -import io.opentelemetry.instrumentation.kafka.internal.KafkaPropagation; -import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge; -import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation; -import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer; -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; -import net.bytebuddy.asm.Advice; -import net.bytebuddy.description.type.TypeDescription; -import net.bytebuddy.matcher.ElementMatcher; -import org.apache.kafka.clients.ApiVersions; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.ProducerRecord; - -public class KafkaProducerInstrumentation implements TypeInstrumentation { - - @Override - public ElementMatcher typeMatcher() { - return named("org.apache.kafka.clients.producer.KafkaProducer"); - } - - @Override - public void transform(TypeTransformer transformer) { - transformer.applyAdviceToMethod( - isConstructor().and(takesArgument(0, Map.class)), - this.getClass().getName() + "$ConstructorMapAdvice"); - transformer.applyAdviceToMethod( - isConstructor().and(takesArgument(0, Properties.class)), - this.getClass().getName() + "$ConstructorPropertiesAdvice"); - transformer.applyAdviceToMethod( - isMethod() - .and(isPublic()) - .and(named("send")) - .and(takesArgument(0, named("org.apache.kafka.clients.producer.ProducerRecord"))) - .and(takesArgument(1, named("org.apache.kafka.clients.producer.Callback"))), - KafkaProducerInstrumentation.class.getName() + "$SendAdvice"); - } - - @SuppressWarnings("unused") - public static class ConstructorMapAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter(@Advice.Argument(value = 0, readOnly = false) Map config) { - // ensure config is a mutable map - if (config.getClass() != HashMap.class) { - config = new HashMap<>(config); - } - enhanceConfig(config); - } - } - - @SuppressWarnings("unused") - public static class ConstructorPropertiesAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter(@Advice.Argument(0) Properties config) { - enhanceConfig(config); - } - } - - @SuppressWarnings("unused") - public static class SendAdvice { - - @Advice.OnMethodEnter(suppress = Throwable.class) - public static KafkaProducerRequest onEnter( - @Advice.FieldValue("apiVersions") ApiVersions apiVersions, - @Advice.FieldValue("clientId") String clientId, - @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, - @Advice.Argument(value = 1, readOnly = false) Callback callback, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelScope") Scope scope) { - - KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId); - Context parentContext = Java8BytecodeBridge.currentContext(); - if (!producerInstrumenter().shouldStart(parentContext, request)) { - return null; - } - - context = producerInstrumenter().start(parentContext, request); - scope = context.makeCurrent(); - - if (KafkaSingletons.isProducerPropagationEnabled() - && KafkaPropagation.shouldPropagate(apiVersions)) { - record = KafkaPropagation.propagateContext(context, record); - } - - // Lumigo custom setting payload as span attribute - if (null != record.value()) { - Java8BytecodeBridge.currentSpan() - .setAttribute(LumigoSemanticAttributes.MESSAGING_PAYLOAD, record.value().toString()); - } - - callback = new ProducerCallback(callback, parentContext, context, request); - return request; - } - - @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void stopSpan( - @Advice.Enter KafkaProducerRequest request, - @Advice.Thrown Throwable throwable, - @Advice.Local("otelContext") Context context, - @Advice.Local("otelScope") Scope scope) { - if (scope == null) { - return; - } - scope.close(); - - if (throwable != null) { - producerInstrumenter().end(context, request, null, throwable); - } - // span finished by ProducerCallback - } - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java deleted file mode 100644 index 49a8a16fdb..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/KafkaSingletons.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; - -import io.lumigo.javaagent.instrumentation.kafkaclients.v0_11.ConsumerPayloadAttributeExtractor; -import io.opentelemetry.api.GlobalOpenTelemetry; -import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; -import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; -import io.opentelemetry.instrumentation.kafka.internal.KafkaInstrumenterFactory; -import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; -import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest; -import io.opentelemetry.instrumentation.kafka.internal.KafkaReceiveRequest; -import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetryMetricsReporter; -import io.opentelemetry.instrumentation.kafka.internal.OpenTelemetrySupplier; -import io.opentelemetry.javaagent.bootstrap.internal.DeprecatedConfigProperties; -import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig; -import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.producer.RecordMetadata; - -public final class KafkaSingletons { - private static final String INSTRUMENTATION_NAME = "io.opentelemetry.kafka-clients-0.11"; - - private static final boolean PRODUCER_PROPAGATION_ENABLED = - DeprecatedConfigProperties.getBoolean( - InstrumentationConfig.get(), - "otel.instrumentation.kafka.client-propagation.enabled", - "otel.instrumentation.kafka.producer-propagation.enabled", - true); - private static final boolean METRICS_ENABLED = - InstrumentationConfig.get() - .getBoolean("otel.instrumentation.kafka.metric-reporter.enabled", true); - - private static final Instrumenter PRODUCER_INSTRUMENTER; - private static final Instrumenter CONSUMER_RECEIVE_INSTRUMENTER; - private static final Instrumenter CONSUMER_PROCESS_INSTRUMENTER; - - static { - KafkaInstrumenterFactory instrumenterFactory = - new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME) - .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) - .setCaptureExperimentalSpanAttributes( - InstrumentationConfig.get() - .getBoolean("otel.instrumentation.kafka.experimental-span-attributes", false)) - .setMessagingReceiveInstrumentationEnabled( - ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()); - PRODUCER_INSTRUMENTER = instrumenterFactory.createProducerInstrumenter(); - CONSUMER_RECEIVE_INSTRUMENTER = instrumenterFactory.createConsumerReceiveInstrumenter(); - // Add Lumigo custom payload extractor - CONSUMER_PROCESS_INSTRUMENTER = - instrumenterFactory.createConsumerOperationInstrumenter( - MessageOperation.PROCESS, Collections.singletonList(new ConsumerPayloadAttributeExtractor())); - } - - public static boolean isProducerPropagationEnabled() { - return PRODUCER_PROPAGATION_ENABLED; - } - - public static Instrumenter producerInstrumenter() { - return PRODUCER_INSTRUMENTER; - } - - public static Instrumenter consumerReceiveInstrumenter() { - return CONSUMER_RECEIVE_INSTRUMENTER; - } - - public static Instrumenter consumerProcessInstrumenter() { - return CONSUMER_PROCESS_INSTRUMENTER; - } - - @SuppressWarnings("unchecked") - public static void enhanceConfig(Map config) { - // skip enhancing configuration when metrics are disabled or when we have already enhanced it - if (!METRICS_ENABLED - || config.get(OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME) - != null) { - return; - } - config.merge( - CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, - OpenTelemetryMetricsReporter.class.getName(), - (class1, class2) -> { - // class1 is either a class name or List of class names or classes - if (class1 instanceof List) { - List result = new ArrayList<>(); - result.addAll((List) class1); - result.add(class2); - return result; - } else if (class1 instanceof String) { - String className1 = (String) class1; - if (className1.isEmpty()) { - return class2; - } - } - return class1 + "," + class2; - }); - config.put( - OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_SUPPLIER, - new OpenTelemetrySupplier(GlobalOpenTelemetry.get())); - config.put( - OpenTelemetryMetricsReporter.CONFIG_KEY_OPENTELEMETRY_INSTRUMENTATION_NAME, - INSTRUMENTATION_NAME); - } - - private KafkaSingletons() {} -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ProducerCallback.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ProducerCallback.java deleted file mode 100644 index ced5e99e1c..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/ProducerCallback.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; - -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.kafka.internal.KafkaProducerRequest; -import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.producer.RecordMetadata; - -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.producerInstrumenter; - -public class ProducerCallback implements Callback { - private final Callback callback; - private final Context parentContext; - private final Context context; - private final KafkaProducerRequest request; - - public ProducerCallback( - Callback callback, Context parentContext, Context context, KafkaProducerRequest request) { - this.callback = callback; - this.parentContext = parentContext; - this.context = context; - this.request = request; - } - - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - producerInstrumenter().end(context, request, metadata, exception); - - if (callback != null) { - try (Scope ignored = parentContext.makeCurrent()) { - callback.onCompletion(metadata, exception); - } - } - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterable.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterable.java deleted file mode 100644 index 425a9e2d18..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterable.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; - -import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; -import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; -import java.util.Iterator; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -public class TracingIterable implements Iterable> { - private final Iterable> delegate; - private final KafkaConsumerContext consumerContext; - private boolean firstIterator = true; - - protected TracingIterable( - Iterable> delegate, KafkaConsumerContext consumerContext) { - this.delegate = delegate; - this.consumerContext = consumerContext; - } - - public static Iterable> wrap( - Iterable> delegate, KafkaConsumerContext consumerContext) { - if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) { - return new TracingIterable<>(delegate, consumerContext); - } - return delegate; - } - - @Override - public Iterator> iterator() { - Iterator> it; - // We should only return one iterator with tracing. - // However, this is not thread-safe, but usually the first (hopefully only) traversal of - // ConsumerRecords is performed in the same thread that called poll() - if (firstIterator) { - it = TracingIterator.wrap(delegate.iterator(), consumerContext); - firstIterator = false; - } else { - it = delegate.iterator(); - } - - return it; - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterator.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterator.java deleted file mode 100644 index 957439a4e7..0000000000 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/kafkaclients/v0_11/TracingIterator.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11; - -import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.consumerProcessInstrumenter; - -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.kafka.internal.KafkaConsumerContext; -import io.opentelemetry.instrumentation.kafka.internal.KafkaProcessRequest; -import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing; -import java.util.Iterator; -import javax.annotation.Nullable; -import org.apache.kafka.clients.consumer.ConsumerRecord; - -public class TracingIterator implements Iterator> { - - private final Iterator> delegateIterator; - private final Context parentContext; - private final KafkaConsumerContext consumerContext; - - /* - * Note: this may potentially create problems if this iterator is used from different threads. But - * at the moment we cannot do much about this. - */ - @Nullable private KafkaProcessRequest currentRequest; - @Nullable private Context currentContext; - @Nullable private Scope currentScope; - - private TracingIterator( - Iterator> delegateIterator, KafkaConsumerContext consumerContext) { - this.delegateIterator = delegateIterator; - - Context receiveContext = consumerContext.getContext(); - // use the receive CONSUMER as parent if it's available - this.parentContext = receiveContext != null ? receiveContext : Context.current(); - this.consumerContext = consumerContext; - } - - public static Iterator> wrap( - Iterator> delegateIterator, KafkaConsumerContext consumerContext) { - if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) { - return new TracingIterator<>(delegateIterator, consumerContext); - } - return delegateIterator; - } - - @Override - public boolean hasNext() { - closeScopeAndEndSpan(); - return delegateIterator.hasNext(); - } - - @Override - public ConsumerRecord next() { - // in case they didn't call hasNext()... - closeScopeAndEndSpan(); - - // it's important not to suppress consumer span creation here using Instrumenter.shouldStart() - // because this instrumentation can leak the context and so there may be a leaked consumer span - // in the context, in which case it's important to overwrite the leaked span instead of - // suppressing the correct span - // (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947) - ConsumerRecord next = delegateIterator.next(); - if (next != null && KafkaClientsConsumerProcessTracing.wrappingEnabled()) { - currentRequest = KafkaProcessRequest.create(consumerContext, next); - currentContext = consumerProcessInstrumenter().start(parentContext, currentRequest); - currentScope = currentContext.makeCurrent(); - } - return next; - } - - private void closeScopeAndEndSpan() { - if (currentScope != null) { - currentScope.close(); - consumerProcessInstrumenter().end(currentContext, currentRequest, null, null); - currentScope = null; - currentRequest = null; - currentContext = null; - } - } - - @Override - public void remove() { - delegateIterator.remove(); - } -} diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientTest.java index 8fa5ded3ff..8207ec92ee 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-0.11/javaagent/src/test/java/io/lumigo/javaagent/instrumentation/kafkaclients/v0_11/KafkaClientTest.java @@ -25,7 +25,6 @@ import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; import io.opentelemetry.sdk.testing.assertj.TracesAssert; -import io.opentelemetry.sdk.trace.data.LinkData; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.semconv.SemanticAttributes; import java.io.IOException; @@ -33,6 +32,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -40,8 +40,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -53,6 +53,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.assertj.core.api.AbstractListAssert; import org.assertj.core.api.AbstractLongAssert; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterAll; @@ -86,7 +87,7 @@ public class KafkaClientTest { @BeforeAll static void setUp() throws InterruptedException, IOException { kafka = - new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.10")) + new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.1")) .withEnv("KAFKA_HEAP_OPTS", "-Xmx256M") .withLogConsumer(new Slf4jLogConsumer(LOGGER)) .waitingFor(Wait.forLogMessage(".*started \\(kafka.server.KafkaServer\\).*", 1)) @@ -110,15 +111,13 @@ static void setUp() throws InterruptedException, IOException { consumer = new KafkaConsumer<>(consumerProps()); consumer.subscribe( - java.util.Collections.singletonList(TOPIC), - new org.apache.kafka.clients.consumer.ConsumerRebalanceListener() { + Collections.singletonList(TOPIC), + new ConsumerRebalanceListener() { @Override - public void onPartitionsRevoked( - java.util.Collection partitions) {} + public void onPartitionsRevoked(Collection partitions) {} @Override - public void onPartitionsAssigned( - java.util.Collection partitions) { + public void onPartitionsAssigned(Collection partitions) { consumerReady.countDown(); } }); @@ -181,8 +180,7 @@ private static List commonAttributes(String clientPrefix) { equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"), equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, TOPIC), satisfies( - SemanticAttributes.MESSAGING_CLIENT_ID, - (value) -> value.startsWith(clientPrefix))); + SemanticAttributes.MESSAGING_CLIENT_ID, (value) -> value.startsWith(clientPrefix))); } private static List sendAttributes(String messageKey, String messageValue) { @@ -202,20 +200,16 @@ private static List sendAttributes(String messageKey, String assertions.add(equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, messageKey)); } if (null != messageValue) { + assertions.add( + satisfies( + AttributeKey.stringArrayKey("messaging.message.headers"), + AbstractListAssert::isNotEmpty)); assertions.add(equalTo(AttributeKey.stringKey("messaging.message.payload"), JSON_BODY)); } return assertions; } - private static List receiveAttributes() { - List assertions = new ArrayList<>(commonAttributes("consumer")); - - assertions.add(equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive")); - - return assertions; - } - private static List processAttributes( String messageKey, String messageValue) { List assertions = new ArrayList<>(commonAttributes("consumer")); @@ -233,6 +227,10 @@ private static List processAttributes( assertions.add(equalTo(SemanticAttributes.MESSAGING_KAFKA_MESSAGE_KEY, messageKey)); } if (null != messageValue) { + assertions.add( + satisfies( + AttributeKey.stringArrayKey("messaging.message.headers"), + AbstractListAssert::isNotEmpty)); assertions.add(equalTo(AttributeKey.stringKey("messaging.message.payload"), JSON_BODY)); assertions.add( equalTo( @@ -275,38 +273,47 @@ void testProducerAndConsumerSpan() throws Exception { }); } - AtomicReference producerSpanRef = new AtomicReference<>(); - TracesAssert.assertThat(instrumentation.waitForTraces(2)) - .hasSize(2) + List> traces = instrumentation.waitForTraces(1); + String parentSpanId = ""; + String producerSpanId = ""; + String consumerSpanId = ""; + for (List trace : traces) { + for (SpanData span : trace) { + if (span.getName().equals("parent")) { + parentSpanId = span.getSpanId(); + } else if (span.getName().equals(TOPIC + " publish")) { + producerSpanId = span.getSpanId(); + } else if (span.getName().equals(TOPIC + " process")) { + consumerSpanId = span.getSpanId(); + } + } + } + + String finalParentSpanId = parentSpanId; + String finalProducerSpanId = producerSpanId; + String finalConsumerSpanId = consumerSpanId; + TracesAssert.assertThat(traces) + .hasSize(1) .hasTracesSatisfyingExactly( trace -> { - trace.hasSpansSatisfyingExactly( + trace.hasSpansSatisfyingExactlyInAnyOrder( span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), span -> span.hasName(TOPIC + " publish") .hasKind(SpanKind.PRODUCER) - .hasParent(trace.getSpan(0)) + .hasParentSpanId(finalParentSpanId) .hasAttributesSatisfying(sendAttributes("1", JSON_BODY)), span -> span.hasName("producer callback") .hasKind(SpanKind.INTERNAL) - .hasParent(trace.getSpan(0))); - producerSpanRef.set(trace.getSpan(1)); - }, - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasName(TOPIC + " receive") - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasAttributesSatisfying(receiveAttributes()), - span -> - span.hasName(TOPIC + " process") - .hasKind(SpanKind.CONSUMER) - .hasLinks(LinkData.create(producerSpanRef.get().getSpanContext())) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfying(processAttributes("1", JSON_BODY)), - span -> span.hasName("processing").hasParent(trace.getSpan(1)))); + .hasParentSpanId(finalParentSpanId), + span -> + span.hasName(TOPIC + " process") + .hasKind(SpanKind.CONSUMER) + .hasParentSpanId(finalProducerSpanId) + .hasAttributesSatisfying(processAttributes("1", JSON_BODY)), + span -> span.hasName("processing").hasParentSpanId(finalConsumerSpanId)); + }); } @Test @@ -328,9 +335,19 @@ void testRecordsWithTopicPartitionKafkaConsume() Assertions.assertThat(record.value()).isEqualTo(JSON_BODY); } - AtomicReference producerSpanRef = new AtomicReference<>(); - TracesAssert.assertThat(instrumentation.waitForTraces(2)) - .hasSize(2) + List> traces = instrumentation.waitForTraces(1); + String producerSpanId = ""; + for (List trace : traces) { + for (SpanData span : trace) { + if (span.getName().equals(TOPIC + " publish")) { + producerSpanId = span.getSpanId(); + } + } + } + String finalProducerSpanId = producerSpanId; + + TracesAssert.assertThat(traces) + .hasSize(1) .hasTracesSatisfyingExactly( trace -> { trace.hasSpansSatisfyingExactly( @@ -338,21 +355,12 @@ void testRecordsWithTopicPartitionKafkaConsume() span.hasName(TOPIC + " publish") .hasKind(SpanKind.PRODUCER) .hasNoParent() - .hasAttributesSatisfying(sendAttributes(null, JSON_BODY))); - producerSpanRef.set(trace.getSpan(0)); - }, - trace -> - trace.hasSpansSatisfyingExactly( - span -> - span.hasName(TOPIC + " receive") - .hasKind(SpanKind.CONSUMER) - .hasNoParent() - .hasAttributesSatisfying(receiveAttributes()), - span -> - span.hasName(TOPIC + " process") - .hasKind(SpanKind.CONSUMER) - .hasLinks(LinkData.create(producerSpanRef.get().getSpanContext())) - .hasParent(trace.getSpan(0)) - .hasAttributesSatisfying(processAttributes(null, JSON_BODY)))); + .hasAttributesSatisfying(sendAttributes(null, JSON_BODY)), + span -> + span.hasName(TOPIC + " process") + .hasKind(SpanKind.CONSUMER) + .hasParentSpanId(finalProducerSpanId) + .hasAttributesSatisfying(processAttributes(null, null))); + }); } } diff --git a/instrumentation/lettuce/lettuce-5.1/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/lettuce/v5_1/LettuceConfigCustomizer.java b/instrumentation/lettuce/lettuce-5.1/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/lettuce/v5_1/LettuceConfigCustomizer.java index 3975d8cb5f..ed61e45d4c 100644 --- a/instrumentation/lettuce/lettuce-5.1/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/lettuce/v5_1/LettuceConfigCustomizer.java +++ b/instrumentation/lettuce/lettuce-5.1/javaagent/src/main/java/io/lumigo/javaagent/instrumentation/lettuce/v5_1/LettuceConfigCustomizer.java @@ -1,3 +1,20 @@ +/* + * Copyright 2024 Lumigo LTD + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ package io.lumigo.javaagent.instrumentation.lettuce.v5_1; import com.google.auto.service.AutoService;