diff --git a/README.md b/README.md
index b7e8ad1..e423577 100644
--- a/README.md
+++ b/README.md
@@ -107,4 +107,5 @@ Add the environment variable `JAVA_TOOL_OPTIONS` to your Lambda functions and se
- Aws SDK V1
- Aws SDK V2
-- Apache HTTP Client
\ No newline at end of file
+- Apache HTTP Client
+- Apache Kafka
\ No newline at end of file
diff --git a/findbugs/findbugs-exclude.xml b/findbugs/findbugs-exclude.xml
index 4536193..3a62d71 100644
--- a/findbugs/findbugs-exclude.xml
+++ b/findbugs/findbugs-exclude.xml
@@ -2,4 +2,13 @@
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 8e4ee0c..d3c29bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,7 +68,7 @@
com.amazonaws
aws-lambda-java-events
- 2.2.6
+ 3.11.5
com.amazonaws
@@ -133,6 +133,13 @@
2.25.45
+
+
+ org.apache.kafka
+ kafka-clients
+ 3.1.0
+
+
com.fasterxml.jackson.core
diff --git a/src/main/java/io/lumigo/core/SpansContainer.java b/src/main/java/io/lumigo/core/SpansContainer.java
index 50a12ef..07813ef 100644
--- a/src/main/java/io/lumigo/core/SpansContainer.java
+++ b/src/main/java/io/lumigo/core/SpansContainer.java
@@ -13,17 +13,25 @@
import io.lumigo.core.utils.JsonUtils;
import io.lumigo.core.utils.SecretScrubber;
import io.lumigo.core.utils.StringUtils;
+import io.lumigo.models.*;
import io.lumigo.models.HttpSpan;
-import io.lumigo.models.Reportable;
import io.lumigo.models.Span;
import java.io.*;
import java.util.*;
import java.util.concurrent.Callable;
+import lombok.Getter;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.internals.ProducerMetadata;
+import org.apache.kafka.common.serialization.Serializer;
import org.pmw.tinylog.Logger;
import software.amazon.awssdk.awscore.AwsResponse;
import software.amazon.awssdk.core.SdkResponse;
@@ -41,14 +49,16 @@ public class SpansContainer {
private static final String AMZN_TRACE_ID = "_X_AMZN_TRACE_ID";
private static final String FUNCTION_SPAN_TYPE = "function";
private static final String HTTP_SPAN_TYPE = "http";
- private static final SecretScrubber secretScrubber = new SecretScrubber(new EnvUtil().getEnv());
+ public static final String KAFKA_SPAN_TYPE = "kafka";
private Span baseSpan;
- private Span startFunctionSpan;
+ @Getter private Span startFunctionSpan;
private Long rttDuration;
private Span endFunctionSpan;
private Reporter reporter;
- private List httpSpans = new LinkedList<>();
+ private SecretScrubber secretScrubber = new SecretScrubber(new EnvUtil().getEnv());
+ @Getter private List spans = new LinkedList<>();
+
private static final SpansContainer ourInstance = new SpansContainer();
public static SpansContainer getInstance() {
@@ -63,7 +73,7 @@ public void clear() {
rttDuration = null;
endFunctionSpan = null;
reporter = null;
- httpSpans = new LinkedList<>();
+ spans = new LinkedList<>();
}
private SpansContainer() {}
@@ -71,6 +81,7 @@ private SpansContainer() {}
public void init(Map env, Reporter reporter, Context context, Object event) {
this.clear();
this.reporter = reporter;
+ this.secretScrubber = new SecretScrubber(new EnvUtil().getEnv());
int javaVersion = AwsUtils.parseJavaVersion(System.getProperty("java.version"));
if (javaVersion > 11) {
@@ -81,6 +92,7 @@ public void init(Map env, Reporter reporter, Context context, Ob
Logger.debug("awsTracerId {}", awsTracerId);
AwsUtils.TriggeredBy triggeredBy = AwsUtils.extractTriggeredByFromEvent(event);
+
long startTime = System.currentTimeMillis();
this.baseSpan =
Span.builder()
@@ -166,8 +178,7 @@ public void start() {
.build();
try {
- rttDuration =
- reporter.reportSpans(prepareToSend(startFunctionSpan, false), MAX_REQUEST_SIZE);
+ rttDuration = reporter.reportSpans(prepareToSend(startFunctionSpan), MAX_REQUEST_SIZE);
} catch (Throwable e) {
Logger.error(e, "Failed to send start span");
}
@@ -214,14 +225,10 @@ private void end(Span endFunctionSpan) throws IOException {
MAX_REQUEST_SIZE);
}
- public Span getStartFunctionSpan() {
- return startFunctionSpan;
- }
-
- public List getAllCollectedSpans() {
- List spans = new LinkedList<>();
+ public List getAllCollectedSpans() {
+ List spans = new LinkedList<>();
spans.add(endFunctionSpan);
- spans.addAll(httpSpans);
+ spans.addAll(this.spans);
return spans;
}
@@ -229,10 +236,6 @@ public Span getEndSpan() {
return endFunctionSpan;
}
- public List getHttpSpans() {
- return httpSpans;
- }
-
private String getStackTrace(Throwable throwable) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw, true);
@@ -307,7 +310,7 @@ public void addHttpSpan(Long startTime, HttpUriRequest request, HttpResponse res
response.getStatusLine().getStatusCode())
.build())
.build());
- httpSpans.add(httpSpan);
+ this.spans.add(httpSpan);
}
public void addHttpSpan(Long startTime, Request> request, Response> response) {
@@ -366,7 +369,7 @@ public void addHttpSpan(Long startTime, Request> request, Response> response
.build());
AwsSdkV1ParserFactory.getParser(request.getServiceName())
.safeParse(httpSpan, request, response);
- httpSpans.add(httpSpan);
+ this.spans.add(httpSpan);
}
public void addHttpSpan(
@@ -435,7 +438,37 @@ public void addHttpSpan(
executionAttributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME))
.safeParse(httpSpan, context);
- httpSpans.add(httpSpan);
+ this.spans.add(httpSpan);
+ }
+
+ public void addKafkaProduceSpan(
+ Long startTime,
+ Serializer keySerializer,
+ Serializer valueSerializer,
+ ProducerMetadata producerMetadata,
+ ProducerRecord record,
+ RecordMetadata recordMetadata,
+ Exception exception) {
+ this.spans.add(
+ KafkaSpanFactory.createProduce(
+ this.baseSpan,
+ startTime,
+ keySerializer,
+ valueSerializer,
+ producerMetadata,
+ record,
+ recordMetadata,
+ exception));
+ }
+
+ public void addKafkaConsumeSpan(
+ Long startTime,
+ KafkaConsumer, ?> consumer,
+ ConsumerMetadata consumerMetadata,
+ ConsumerRecords, ?> consumerRecords) {
+ this.spans.add(
+ KafkaSpanFactory.createConsume(
+ this.baseSpan, startTime, consumer, consumerMetadata, consumerRecords));
}
private static String extractHeaders(Map headers) {
@@ -522,18 +555,18 @@ protected static T callIfVerbose(Callable method) {
}
}
- private Reportable prepareToSend(Reportable span, boolean hasError) {
- return reduceSpanSize(span.scrub(secretScrubber), hasError);
+ private BaseSpan prepareToSend(BaseSpan span) {
+ return reduceSpanSize(span.scrub(secretScrubber), false);
}
- private List prepareToSend(List spans, boolean hasError) {
- for (Reportable span : spans) {
+ private List prepareToSend(List spans, boolean hasError) {
+ for (BaseSpan span : spans) {
reduceSpanSize(span.scrub(secretScrubber), hasError);
}
return spans;
}
- public Reportable reduceSpanSize(Reportable span, boolean hasError) {
+ public BaseSpan reduceSpanSize(BaseSpan span, boolean hasError) {
int maxFieldSize =
hasError
? Configuration.getInstance().maxSpanFieldSizeWhenError()
diff --git a/src/main/java/io/lumigo/core/configuration/Configuration.java b/src/main/java/io/lumigo/core/configuration/Configuration.java
index ed1c3ae..9a4f1dd 100644
--- a/src/main/java/io/lumigo/core/configuration/Configuration.java
+++ b/src/main/java/io/lumigo/core/configuration/Configuration.java
@@ -28,6 +28,7 @@ public class Configuration {
public static final String LUMIGO_MAX_SIZE_FOR_REQUEST = "LUMIGO_MAX_SIZE_FOR_REQUEST";
public static final String LUMIGO_INSTRUMENTATION = "LUMIGO_INSTRUMENTATION";
public static final String LUMIGO_SECRET_MASKING_REGEX = "LUMIGO_SECRET_MASKING_REGEX";
+ public static final String LUMIGO_MAX_BATCH_MESSAGE_IDS = "LUMIGO_MAX_BATCH_MESSAGE_IDS";
private static Configuration instance;
private LumigoConfiguration inlineConf;
@@ -137,4 +138,12 @@ public int maxRequestSize() {
LUMIGO_MAX_SIZE_FOR_REQUEST,
envUtil.getIntegerEnv(LUMIGO_MAX_RESPONSE_SIZE, 1024 * 500));
}
+
+ public int maxBatchMessageIds() {
+ int value = envUtil.getIntegerEnv(LUMIGO_MAX_BATCH_MESSAGE_IDS, 20);
+ if (value == 0) {
+ value = 20;
+ }
+ return value;
+ }
}
diff --git a/src/main/java/io/lumigo/core/instrumentation/agent/Loader.java b/src/main/java/io/lumigo/core/instrumentation/agent/Loader.java
index bfb17d8..77be428 100644
--- a/src/main/java/io/lumigo/core/instrumentation/agent/Loader.java
+++ b/src/main/java/io/lumigo/core/instrumentation/agent/Loader.java
@@ -3,9 +3,7 @@
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.not;
-import io.lumigo.core.instrumentation.impl.AmazonHttpClientInstrumentation;
-import io.lumigo.core.instrumentation.impl.AmazonHttpClientV2Instrumentation;
-import io.lumigo.core.instrumentation.impl.ApacheHttpInstrumentation;
+import io.lumigo.core.instrumentation.impl.*;
import net.bytebuddy.agent.builder.AgentBuilder;
import org.pmw.tinylog.Logger;
@@ -17,6 +15,10 @@ public static void instrument(java.lang.instrument.Instrumentation inst) {
new AmazonHttpClientInstrumentation();
AmazonHttpClientV2Instrumentation amazonHttpClientV2Instrumentation =
new AmazonHttpClientV2Instrumentation();
+ ApacheKafkaProducerInstrumentation apacheKafkaInstrumentation =
+ new ApacheKafkaProducerInstrumentation();
+ ApacheKafkaConsumerInstrumentation apacheKafkaConsumerInstrumentation =
+ new ApacheKafkaConsumerInstrumentation();
AgentBuilder builder =
new AgentBuilder.Default()
.disableClassFormatChanges()
@@ -27,13 +29,28 @@ public static void instrument(java.lang.instrument.Instrumentation inst) {
.and(
not(
nameStartsWith(
- "software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder"))))
+ AmazonHttpClientV2Instrumentation
+ .INSTRUMENTATION_PACKAGE_PREFIX)))
+ .and(
+ not(
+ nameStartsWith(
+ ApacheKafkaProducerInstrumentation
+ .INSTRUMENTATION_PACKAGE_PREFIX)))
+ .and(
+ not(
+ nameStartsWith(
+ ApacheKafkaConsumerInstrumentation
+ .INSTRUMENTATION_PACKAGE_PREFIX))))
.type(apacheHttpInstrumentation.getTypeMatcher())
.transform(apacheHttpInstrumentation.getTransformer())
.type(amazonHttpClientInstrumentation.getTypeMatcher())
.transform(amazonHttpClientInstrumentation.getTransformer())
.type(amazonHttpClientV2Instrumentation.getTypeMatcher())
- .transform(amazonHttpClientV2Instrumentation.getTransformer());
+ .transform(amazonHttpClientV2Instrumentation.getTransformer())
+ .type(apacheKafkaInstrumentation.getTypeMatcher())
+ .transform(apacheKafkaInstrumentation.getTransformer())
+ .type(apacheKafkaConsumerInstrumentation.getTypeMatcher())
+ .transform(apacheKafkaConsumerInstrumentation.getTransformer());
builder.installOn(inst);
Logger.debug("Finish Instrumentation");
diff --git a/src/main/java/io/lumigo/core/instrumentation/impl/AmazonHttpClientV2Instrumentation.java b/src/main/java/io/lumigo/core/instrumentation/impl/AmazonHttpClientV2Instrumentation.java
index e816487..03e2e14 100644
--- a/src/main/java/io/lumigo/core/instrumentation/impl/AmazonHttpClientV2Instrumentation.java
+++ b/src/main/java/io/lumigo/core/instrumentation/impl/AmazonHttpClientV2Instrumentation.java
@@ -18,9 +18,13 @@
import software.amazon.awssdk.http.SdkHttpRequest;
public class AmazonHttpClientV2Instrumentation implements LumigoInstrumentationApi {
+
+ public static final String INSTRUMENTATION_PACKAGE_PREFIX =
+ "software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder";
+
@Override
public ElementMatcher getTypeMatcher() {
- return named("software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder");
+ return named(INSTRUMENTATION_PACKAGE_PREFIX);
}
@Override
diff --git a/src/main/java/io/lumigo/core/instrumentation/impl/ApacheKafkaConsumerInstrumentation.java b/src/main/java/io/lumigo/core/instrumentation/impl/ApacheKafkaConsumerInstrumentation.java
new file mode 100644
index 0000000..e5345ef
--- /dev/null
+++ b/src/main/java/io/lumigo/core/instrumentation/impl/ApacheKafkaConsumerInstrumentation.java
@@ -0,0 +1,71 @@
+package io.lumigo.core.instrumentation.impl;
+
+import static net.bytebuddy.matcher.ElementMatchers.*;
+
+import io.lumigo.core.SpansContainer;
+import io.lumigo.core.instrumentation.LumigoInstrumentationApi;
+import io.lumigo.core.instrumentation.agent.Loader;
+import io.lumigo.core.utils.LRUCache;
+import net.bytebuddy.agent.builder.AgentBuilder;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
+import org.pmw.tinylog.Logger;
+
+public class ApacheKafkaConsumerInstrumentation implements LumigoInstrumentationApi {
+
+ public static final String INSTRUMENTATION_PACKAGE_PREFIX = "org.apache.kafka.clients.consumer";
+
+ @Override
+ public ElementMatcher getTypeMatcher() {
+ return named("org.apache.kafka.clients.consumer.KafkaConsumer");
+ }
+
+ @Override
+ public AgentBuilder.Transformer.ForAdvice getTransformer() {
+ return new AgentBuilder.Transformer.ForAdvice()
+ .include(Loader.class.getClassLoader())
+ .advice(
+ isMethod()
+ .and(isPublic())
+ .and(named("poll"))
+ .and(takesArguments(1))
+ .and(
+ returns(
+ named(
+ "org.apache.kafka.clients.consumer.ConsumerRecords"))),
+ ApacheKafkaConsumerAdvice.class.getName());
+ }
+
+ public static class ApacheKafkaConsumerAdvice {
+ public static final SpansContainer spansContainer = SpansContainer.getInstance();
+ public static final LRUCache startTimeMap = new LRUCache<>(1000);
+
+ @Advice.OnMethodEnter(suppress = Throwable.class)
+ public static void methodEnter(@Advice.FieldValue("clientId") String clientId) {
+ try {
+ startTimeMap.put(clientId, System.currentTimeMillis());
+ } catch (Exception e) {
+ Logger.error(e);
+ }
+ }
+
+ @Advice.OnMethodExit(suppress = Throwable.class)
+ public static void methodExit(
+ @Advice.This KafkaConsumer, ?> consumer,
+ @Advice.FieldValue("metadata") ConsumerMetadata metadata,
+ @Advice.FieldValue("clientId") String clientId,
+ @Advice.Return ConsumerRecords, ?> consumerRecords) {
+ try {
+ Logger.info("Handling kafka request {}", consumerRecords.hashCode());
+ spansContainer.addKafkaConsumeSpan(
+ startTimeMap.get(clientId), consumer, metadata, consumerRecords);
+ } catch (Throwable error) {
+ Logger.error(error, "Failed to add kafka span");
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/lumigo/core/instrumentation/impl/ApacheKafkaProducerInstrumentation.java b/src/main/java/io/lumigo/core/instrumentation/impl/ApacheKafkaProducerInstrumentation.java
new file mode 100644
index 0000000..c6b3b37
--- /dev/null
+++ b/src/main/java/io/lumigo/core/instrumentation/impl/ApacheKafkaProducerInstrumentation.java
@@ -0,0 +1,116 @@
+package io.lumigo.core.instrumentation.impl;
+
+import static net.bytebuddy.matcher.ElementMatchers.*;
+
+import io.lumigo.core.SpansContainer;
+import io.lumigo.core.instrumentation.LumigoInstrumentationApi;
+import io.lumigo.core.instrumentation.agent.Loader;
+import io.lumigo.models.KafkaSpan;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import lombok.AllArgsConstructor;
+import net.bytebuddy.agent.builder.AgentBuilder;
+import net.bytebuddy.asm.Advice;
+import net.bytebuddy.description.type.TypeDescription;
+import net.bytebuddy.matcher.ElementMatcher;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.internals.ProducerMetadata;
+import org.apache.kafka.common.serialization.Serializer;
+import org.pmw.tinylog.Logger;
+
+public class ApacheKafkaProducerInstrumentation implements LumigoInstrumentationApi {
+
+ public static final String INSTRUMENTATION_PACKAGE_PREFIX = "org.apache.kafka.clients.producer";
+
+ @Override
+ public ElementMatcher getTypeMatcher() {
+ return named("org.apache.kafka.clients.producer.KafkaProducer");
+ }
+
+ @Override
+ public AgentBuilder.Transformer.ForAdvice getTransformer() {
+ return new AgentBuilder.Transformer.ForAdvice()
+ .include(Loader.class.getClassLoader())
+ .advice(
+ isMethod()
+ .and(isPublic())
+ .and(named("send"))
+ .and(
+ takesArgument(
+ 0,
+ named(
+ "org.apache.kafka.clients.producer.ProducerRecord"))
+ .and(
+ takesArgument(
+ 1,
+ named(
+ "org.apache.kafka.clients.producer.Callback")))),
+ ApacheKafkaProducerAdvice.class.getName());
+ }
+
+ public static class ApacheKafkaProducerAdvice {
+ public static final SpansContainer spansContainer = SpansContainer.getInstance();
+
+ @Advice.OnMethodEnter
+ public static void methodEnter(
+ @Advice.FieldValue("metadata") ProducerMetadata metadata,
+ @Advice.FieldValue("keySerializer") Serializer keySerializer,
+ @Advice.FieldValue("valueSerializer") Serializer valueSerializer,
+ @Advice.Argument(value = 0, readOnly = false) ProducerRecord record,
+ @Advice.Argument(value = 1, readOnly = false) Callback callback) {
+ try {
+ callback =
+ new KafkaProducerCallback<>(
+ callback,
+ keySerializer,
+ valueSerializer,
+ metadata,
+ record,
+ System.currentTimeMillis());
+
+ // Try to inject correlation id to the kafka record headers
+ record.headers()
+ .add(
+ KafkaSpan.LUMIGO_MESSAGE_ID_KEY,
+ UUID.randomUUID()
+ .toString()
+ .substring(0, 10)
+ .getBytes(StandardCharsets.UTF_8));
+ } catch (Exception e) {
+ Logger.error(e);
+ }
+ }
+
+ @AllArgsConstructor
+ public static class KafkaProducerCallback implements Callback {
+ private final Callback callback;
+ private final Serializer keySerializer;
+ private final Serializer valueSerializer;
+ private final ProducerMetadata producerMetadata;
+ private final ProducerRecord record;
+ private final long startTime;
+
+ @Override
+ public void onCompletion(RecordMetadata recordMetadata, Exception exception) {
+ try {
+ if (callback != null) {
+ callback.onCompletion(recordMetadata, exception);
+ }
+ Logger.info("Handling kafka request {}", record.hashCode());
+ spansContainer.addKafkaProduceSpan(
+ startTime,
+ keySerializer,
+ valueSerializer,
+ producerMetadata,
+ record,
+ recordMetadata,
+ exception);
+ } catch (Throwable error) {
+ Logger.error(error, "Failed to add kafka span");
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/lumigo/core/network/Reporter.java b/src/main/java/io/lumigo/core/network/Reporter.java
index 325efda..def3ed1 100644
--- a/src/main/java/io/lumigo/core/network/Reporter.java
+++ b/src/main/java/io/lumigo/core/network/Reporter.java
@@ -3,7 +3,7 @@
import io.lumigo.core.configuration.Configuration;
import io.lumigo.core.utils.JsonUtils;
import io.lumigo.core.utils.StringUtils;
-import io.lumigo.models.Reportable;
+import io.lumigo.models.BaseSpan;
import java.io.IOException;
import java.util.Collections;
import java.util.LinkedList;
@@ -13,7 +13,7 @@
public class Reporter {
- private OkHttpClient client;
+ private final OkHttpClient client;
public Reporter() {
client =
@@ -22,11 +22,11 @@ public Reporter() {
.build();
}
- public long reportSpans(Reportable span, int maxSize) throws IOException {
+ public long reportSpans(BaseSpan span, int maxSize) throws IOException {
return reportSpans(Collections.singletonList(span), maxSize);
}
- public long reportSpans(List spans, int maxSize) throws IOException {
+ public long reportSpans(List spans, int maxSize) throws IOException {
long time = System.currentTimeMillis();
List spansAsStringList = new LinkedList<>();
int sizeCount = 0;
@@ -44,7 +44,7 @@ public long reportSpans(List spans, int maxSize) throws IOException
handledSpans++;
}
- if (Configuration.getInstance().isAwsEnvironment() && spansAsStringList.size() > 0) {
+ if (Configuration.getInstance().isAwsEnvironment() && !spansAsStringList.isEmpty()) {
String spansAsString = "[" + String.join(",", spansAsStringList) + "]";
Logger.debug("Reporting the spans: {}", spansAsString);
RequestBody body =
diff --git a/src/main/java/io/lumigo/core/utils/AwsSdkV2Utils.java b/src/main/java/io/lumigo/core/utils/AwsSdkV2Utils.java
index 7432232..0fabacf 100644
--- a/src/main/java/io/lumigo/core/utils/AwsSdkV2Utils.java
+++ b/src/main/java/io/lumigo/core/utils/AwsSdkV2Utils.java
@@ -10,8 +10,7 @@
@UtilityClass
public class AwsSdkV2Utils {
- public String calculateItemHash(
- Map item) {
+ public String calculateItemHash(Map item) {
Map simpleMap = AwsSdkV2Utils.convertAttributeMapToSimpleMap(item);
return StringUtils.buildMd5Hash(JsonUtils.getObjectAsJsonString(simpleMap));
}
diff --git a/src/main/java/io/lumigo/core/utils/AwsUtils.java b/src/main/java/io/lumigo/core/utils/AwsUtils.java
index 440a619..905b443 100644
--- a/src/main/java/io/lumigo/core/utils/AwsUtils.java
+++ b/src/main/java/io/lumigo/core/utils/AwsUtils.java
@@ -1,13 +1,13 @@
package io.lumigo.core.utils;
-import static io.lumigo.core.utils.StringUtils.dynamodbItemToHash;
-
import com.amazonaws.services.lambda.runtime.events.*;
+import com.amazonaws.services.lambda.runtime.events.models.dynamodb.AttributeValue;
import com.fasterxml.jackson.annotation.JsonInclude;
+import io.lumigo.core.configuration.Configuration;
+import io.lumigo.models.KafkaSpan;
import io.lumigo.models.Span;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -155,6 +155,37 @@ public static TriggeredBy extractTriggeredByFromEvent(Object event) {
triggeredBy.setTriggeredBy("lex");
} else if (event instanceof CognitoEvent) {
triggeredBy.setTriggeredBy("cognito");
+ } else if (event instanceof KafkaEvent) {
+ triggeredBy.setTriggeredBy("kafka");
+ triggeredBy.setArn(((KafkaEvent) event).getEventSourceArn());
+ String topic = null;
+ List messageIds = new ArrayList<>();
+ if (((KafkaEvent) event).getRecords() != null) {
+ for (Map.Entry> entry :
+ ((KafkaEvent) event).getRecords().entrySet()) {
+ for (KafkaEvent.KafkaEventRecord record : entry.getValue()) {
+ if (topic == null) {
+ topic = record.getTopic();
+ }
+ for (Map headers : record.getHeaders()) {
+ if (headers.containsKey(KafkaSpan.LUMIGO_MESSAGE_ID_KEY)) {
+ messageIds.add(
+ new String(
+ headers.get(KafkaSpan.LUMIGO_MESSAGE_ID_KEY),
+ StandardCharsets.UTF_8));
+ break;
+ }
+ }
+ }
+ }
+ }
+ triggeredBy.setResource(topic);
+ triggeredBy.setMessageIds(
+ messageIds.subList(
+ 0,
+ Math.min(
+ messageIds.size(),
+ Configuration.getInstance().maxBatchMessageIds())));
} else {
Logger.info(
"Failed to found relevant triggered by found for event {} ",
@@ -282,10 +313,44 @@ private static String extractMessageIdFromDynamodbRecord(
DynamodbEvent.DynamodbStreamRecord record) {
if (record.getEventName() == null) return null;
if (record.getEventName().equals("INSERT")) {
- return dynamodbItemToHash(record.getDynamodb().getNewImage());
+ return calculateItemHash(record.getDynamodb().getNewImage());
} else if (record.getEventName().equals("MODIFY")
|| record.getEventName().equals("REMOVE")) {
- return dynamodbItemToHash(record.getDynamodb().getKeys());
+ return calculateItemHash(record.getDynamodb().getKeys());
+ }
+ return null;
+ }
+
+ private static String calculateItemHash(Map item) {
+ Map simpleMap = convertAttributeMapToSimpleMap(item);
+ return StringUtils.buildMd5Hash(JsonUtils.getObjectAsJsonString(simpleMap));
+ }
+
+ private static Map convertAttributeMapToSimpleMap(
+ Map attributeValueMap) {
+ Map simpleMap = new HashMap<>();
+ attributeValueMap.forEach(
+ (key, value) -> simpleMap.put(key, attributeValueToObject(value)));
+ return simpleMap;
+ }
+
+ private static Object attributeValueToObject(AttributeValue value) {
+ if (value == null) {
+ return null;
+ } else if (value.getS() != null) {
+ return value.getS();
+ } else if (value.getN() != null) {
+ return value.getN();
+ } else if (value.getBOOL() != null) {
+ return value.getBOOL();
+ } else if (value.getL() != null && !value.getL().isEmpty()) {
+ List