Skip to content

Commit

Permalink
unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nadav3396 committed May 20, 2024
1 parent 1b1bba6 commit 1df638b
Show file tree
Hide file tree
Showing 6 changed files with 468 additions and 191 deletions.
5 changes: 3 additions & 2 deletions src/main/java/io/lumigo/core/SpansContainer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.lumigo.core.utils.StringUtils;
import io.lumigo.models.HttpSpan;
import io.lumigo.models.KafkaSpan;
import io.lumigo.models.KafkaSpanFactory;
import io.lumigo.models.Span;
import java.io.*;
import java.util.*;
Expand Down Expand Up @@ -455,7 +456,7 @@ public <K, V> void addKafkaProduceSpan(
RecordMetadata recordMetadata,
Exception exception) {
this.kafkaSpans.add(
KafkaSpan.createProduce(
KafkaSpanFactory.createProduce(
this.baseSpan,
startTime,
keySerializer,
Expand All @@ -472,7 +473,7 @@ public void addKafkaConsumeSpan(
ConsumerMetadata consumerMetadata,
ConsumerRecords<?, ?> consumerRecords) {
this.kafkaSpans.add(
KafkaSpan.createConsume(
KafkaSpanFactory.createConsume(
this.baseSpan, startTime, consumer, consumerMetadata, consumerRecords));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ public class ApacheKafkaConsumerInstrumentation implements LumigoInstrumentation

@Override
public ElementMatcher<TypeDescription> getTypeMatcher() {
System.out.println("Inside ApacheKafkaConsumerInstrumentation.getTypeMatcher()");
return named("org.apache.kafka.clients.consumer.KafkaConsumer");
}

@Override
public AgentBuilder.Transformer.ForAdvice getTransformer() {
System.out.println("Inside ApacheKafkaConsumerInstrumentation.getTransformer()");
return new AgentBuilder.Transformer.ForAdvice()
.include(Loader.class.getClassLoader())
.advice(
Expand All @@ -44,15 +42,12 @@ public AgentBuilder.Transformer.ForAdvice getTransformer() {

public static class ApacheKafkaConsumerAdvice {
public static final SpansContainer spansContainer = SpansContainer.getInstance();
public static final LRUCache<Integer, Boolean> handled = new LRUCache<>(1000);
public static final LRUCache<Integer, Long> startTimeMap = new LRUCache<>(1000);
public static final LRUCache<String, Long> startTimeMap = new LRUCache<>(1000);

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void methodEnter() {
public static void methodEnter(@Advice.FieldValue("clientId") String clientId) {
try {
System.out.println("Inside ApacheKafkaConsumerAdvice.methodEnter()");
// TODO fix start time
// startTimeMap.put(record.hashCode(), System.currentTimeMillis());
startTimeMap.put(clientId, System.currentTimeMillis());
} catch (Exception e) {
Logger.error(e);
}
Expand All @@ -62,13 +57,12 @@ public static void methodEnter() {
public static void methodExit(
@Advice.This KafkaConsumer<?, ?> consumer,
@Advice.FieldValue("metadata") ConsumerMetadata metadata,
@Advice.FieldValue("clientId") String clientId,
@Advice.Return ConsumerRecords<?, ?> consumerRecords) {
try {
System.out.println("Inside ApacheKafkaConsumerAdvice.methodExit()");
Logger.info("Handling kafka request {}", consumerRecords.hashCode());
spansContainer.addKafkaConsumeSpan(
System.currentTimeMillis(), consumer, metadata, consumerRecords);
handled.put(consumerRecords.hashCode(), true);
startTimeMap.get(clientId), consumer, metadata, consumerRecords);
} catch (Throwable error) {
Logger.error(error, "Failed to add kafka span");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ public class ApacheKafkaProducerInstrumentation implements LumigoInstrumentation

@Override
public ElementMatcher<TypeDescription> getTypeMatcher() {
System.out.println("Inside ApacheKafkaProducerInstrumentation.getTypeMatcher()");
return named("org.apache.kafka.clients.producer.KafkaProducer");
}

@Override
public AgentBuilder.Transformer.ForAdvice getTransformer() {
System.out.println("Inside ApacheKafkaProducerInstrumentation.getTransformer()");
return new AgentBuilder.Transformer.ForAdvice()
.include(Loader.class.getClassLoader())
.advice(
Expand Down Expand Up @@ -63,7 +61,6 @@ public static <K, V> void methodEnter(
@Advice.Argument(value = 0, readOnly = false) ProducerRecord<K, V> record,
@Advice.Argument(value = 1, readOnly = false) Callback callback) {
try {
System.out.println("Inside ApacheKafkaProducerAdvice.methodEnter()");
callback =
new KafkaProducerCallback<>(
callback,
Expand Down Expand Up @@ -98,9 +95,7 @@ public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
if (callback != null) {
callback.onCompletion(recordMetadata, exception);
}
System.out.println("Inside KafkaProducerCallback.onCompletion()");

Logger.info("Handling kafka request {} from host {}", record.hashCode());
Logger.info("Handling kafka request {}", record.hashCode());
spansContainer.addKafkaProduceSpan(
startTime,
keySerializer,
Expand Down
185 changes: 13 additions & 172 deletions src/main/java/io/lumigo/models/KafkaSpan.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,13 @@
package io.lumigo.models;

import static io.lumigo.core.SpansContainer.KAFKA_SPAN_TYPE;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.*;
import java.util.stream.Collectors;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.ProducerMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import lombok.Getter;

@Data
@Getter
@Builder(toBuilder = true)
@AllArgsConstructor
public class KafkaSpan {
Expand All @@ -37,7 +26,7 @@ public class KafkaSpan {
private Info info;

@Builder(toBuilder = true)
@Data(staticConstructor = "of")
@Getter
public static class Info {
private KafkaSpan.Tracer tracer;
private KafkaSpan.TraceId traceId;
Expand All @@ -50,15 +39,15 @@ public static class Info {

@AllArgsConstructor
@Builder(toBuilder = true)
@Data(staticConstructor = "of")
@Getter
public static class TraceId {
@JsonProperty("Root")
private String root;
}

@AllArgsConstructor
@Builder(toBuilder = true)
@Data(staticConstructor = "of")
@Getter
public static class Tracer {
private String version;
}
Expand All @@ -67,9 +56,9 @@ public interface KafkaInfo {}

@AllArgsConstructor
@Builder(toBuilder = true)
@Data(staticConstructor = "of")
@Getter
public static class KafkaProducerInfo implements KafkaInfo {
private final String kafkaInfoType = KAFKA_PRODUCER_TYPE;
private static final String kafkaInfoType = KAFKA_PRODUCER_TYPE;
private List<String> bootstrapServers;
private String topic;
private KafkaSpan.KafkaProducerRecord record;
Expand All @@ -78,7 +67,7 @@ public static class KafkaProducerInfo implements KafkaInfo {

@AllArgsConstructor
@Builder(toBuilder = true)
@Data(staticConstructor = "of")
@Getter
public static class KafkaProducerRecord {
private byte[] key;
private byte[] value;
Expand All @@ -89,24 +78,24 @@ public interface KafkaProducerResponse {}

@AllArgsConstructor
@Builder(toBuilder = true)
@Data(staticConstructor = "of")
@Getter
public static class KafkaProducerSuccessResponse implements KafkaProducerResponse {
private Integer partition;
private Long offset;
}

@AllArgsConstructor
@Builder(toBuilder = true)
@Data(staticConstructor = "of")
@Getter
public static class KafkaProducerErrorResponse implements KafkaProducerResponse {
private String errorMessage;
}

@AllArgsConstructor
@Builder(toBuilder = true)
@Data(staticConstructor = "of")
@Getter
public static class KafkaConsumerInfo implements KafkaInfo {
private final String kafkaInfoType = KAFKA_CONSUMER_TYPE;
private static final String kafkaInfoType = KAFKA_CONSUMER_TYPE;
private List<String> bootstrapServers;
private String consumerGroupId;
private Integer recordsCount;
Expand All @@ -116,7 +105,7 @@ public static class KafkaConsumerInfo implements KafkaInfo {

@AllArgsConstructor
@Builder(toBuilder = true)
@Data(staticConstructor = "of")
@Getter
public static class KafkaConsumerRecord {
private String topic;
private Integer partition;
Expand All @@ -125,152 +114,4 @@ public static class KafkaConsumerRecord {
private String value;
private Map<String, byte[]> headers;
}

public static <K, V> KafkaSpan createProduce(
Span baseSpan,
Long startTime,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata producerMetadata,
ProducerRecord<K, V> record,
RecordMetadata recordMetadata,
Exception exception) {
List<String> bootstrapServers =
producerMetadata.fetch().nodes().stream()
.map(node -> node.host() + ":" + node.port())
.collect(Collectors.toList());
String topic = record.topic();
KafkaProducerRecord producerRecord =
KafkaProducerRecord.builder()
.key(
keySerializer.serialize(
record.topic(), record.headers(), record.key()))
.value(
valueSerializer.serialize(
record.topic(), record.headers(), record.value()))
.headers(extractHeaders(record.headers()))
.build();

KafkaInfo info;
if (exception == null) {
info =
KafkaSpan.KafkaProducerInfo.builder()
.bootstrapServers(bootstrapServers)
.topic(topic)
.record(producerRecord)
.response(
KafkaProducerSuccessResponse.builder()
.partition(recordMetadata.partition())
.offset(recordMetadata.offset())
.build())
.build();
} else {
info =
KafkaProducerInfo.builder()
.bootstrapServers(bootstrapServers)
.topic(topic)
.record(producerRecord)
.response(
KafkaProducerErrorResponse.builder()
.errorMessage(exception.getMessage())
.build())
.build();
}

return new KafkaSpanBuilder()
.id(UUID.randomUUID().toString())
.started(startTime)
.ended(System.currentTimeMillis())
.type(KAFKA_SPAN_TYPE)
.transactionId(baseSpan.getTransactionId())
.account(baseSpan.getAccount())
.region(baseSpan.getRegion())
.token(baseSpan.getToken())
.parentId(baseSpan.getId())
.info(
KafkaSpan.Info.builder()
.tracer(
KafkaSpan.Tracer.builder()
.version(
baseSpan.getInfo().getTracer().getVersion())
.build())
.traceId(
KafkaSpan.TraceId.builder()
.root(baseSpan.getInfo().getTraceId().getRoot())
.build())
.messageId(
new String(
record.headers()
.lastHeader("lumigoMessageId")
.value()))
.kafkaInfo(info)
.build())
.build();
}

public static KafkaSpan createConsume(
Span baseSpan,
Long startTime,
KafkaConsumer<?, ?> consumer,
ConsumerMetadata consumerMetadata,
ConsumerRecords<?, ?> consumerRecords) {
List<String> messageIds = new ArrayList<>();
List<String> bootstrapServers =
consumerMetadata.fetch().nodes().stream()
.map(node -> node.host() + ":" + node.port())
.collect(Collectors.toList());
List<String> topics = new ArrayList<>(consumer.subscription());
List<KafkaConsumerRecord> records = new ArrayList<>();
consumerRecords.forEach(
record -> {
messageIds.add(
new String(record.headers().lastHeader("lumigoMessageId").value()));
records.add(
KafkaConsumerRecord.builder()
.topic(record.topic())
.partition(record.partition())
.offset(record.offset())
.key(record.key().toString())
.value(record.value().toString())
.headers(extractHeaders(record.headers()))
.build());
});
return KafkaSpan.builder()
.id(UUID.randomUUID().toString())
.started(startTime)
.ended(System.currentTimeMillis())
.type(KAFKA_SPAN_TYPE)
.transactionId(baseSpan.getTransactionId())
.account(baseSpan.getAccount())
.region(baseSpan.getRegion())
.token(baseSpan.getToken())
.parentId(baseSpan.getId())
.info(
Info.builder()
.tracer(
KafkaSpan.Tracer.builder()
.version(
baseSpan.getInfo().getTracer().getVersion())
.build())
.traceId(
KafkaSpan.TraceId.builder()
.root(baseSpan.getInfo().getTraceId().getRoot())
.build())
.messageIds(messageIds)
.kafkaInfo(
KafkaSpan.KafkaConsumerInfo.builder()
.bootstrapServers(bootstrapServers)
.consumerGroupId(consumer.groupMetadata().groupId())
.topics(topics)
.recordsCount(consumerRecords.count())
.records(records)
.build())
.build())
.build();
}

private static Map<String, byte[]> extractHeaders(Headers headers) {
return Arrays.stream(headers.toArray())
.collect(Collectors.toMap(Header::key, Header::value));
}
}
Loading

0 comments on commit 1df638b

Please sign in to comment.