From a8acc97c575144b0439d64c66e43f38da227207a Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Sun, 3 Nov 2024 18:21:34 +0200 Subject: [PATCH] Ensure trace is properly sent using OTEL Signed-off-by: Artur Ciocanu --- .../messaging/DaprMessagingTemplate.java | 22 ++++++++++++++- dapr-spring/pom.xml | 10 +++++++ .../io/dapr/testcontainers/DaprContainer.java | 28 ++++++++++++++++++- 3 files changed, 58 insertions(+), 2 deletions(-) diff --git a/dapr-spring/dapr-spring-messaging/src/main/java/io/dapr/spring/messaging/DaprMessagingTemplate.java b/dapr-spring/dapr-spring-messaging/src/main/java/io/dapr/spring/messaging/DaprMessagingTemplate.java index 39c2129dd..6e4140936 100644 --- a/dapr-spring/dapr-spring-messaging/src/main/java/io/dapr/spring/messaging/DaprMessagingTemplate.java +++ b/dapr-spring/dapr-spring-messaging/src/main/java/io/dapr/spring/messaging/DaprMessagingTemplate.java @@ -20,6 +20,8 @@ import io.dapr.spring.messaging.observation.DaprMessagingSenderContext; import io.micrometer.observation.Observation; import io.micrometer.observation.ObservationRegistry; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.context.propagation.TextMapSetter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.BeanNameAware; @@ -27,9 +29,11 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import reactor.core.publisher.Mono; +import reactor.util.context.Context; import javax.annotation.Nullable; +import java.util.HashMap; import java.util.Map; /** @@ -55,6 +59,9 @@ public class DaprMessagingTemplate implements DaprMessagingOperations, App @Nullable private String beanName; + @Nullable + private OpenTelemetry openTelemetry; + @Nullable private ObservationRegistry observationRegistry; @@ -102,6 +109,8 @@ public void afterSingletonsInstantiated() { observationRegistry = applicationContext.getBeanProvider(ObservationRegistry.class) .getIfUnique(() -> observationRegistry); + this.openTelemetry = this.applicationContext.getBeanProvider(OpenTelemetry.class) + .getIfUnique(() -> this.openTelemetry); observationConvention = applicationContext.getBeanProvider(DaprMessagingObservationConvention.class) .getIfUnique(() -> observationConvention); } @@ -133,7 +142,7 @@ private Mono doSendAsync(String topic, T message) { private boolean canUseObservation() { return observationEnabled && observationRegistry != null - && observationConvention != null + && openTelemetry != null && beanName != null; } @@ -147,6 +156,7 @@ private Mono publishEventWithObservation(String pubsubName, String topic, return observation.observe(() -> publishEvent(pubsubName, topic, message) + .contextWrite(getReactorContext()) .doOnError(err -> { LOGGER.error("Failed to send msg to '{}' topic", topic, err); @@ -161,6 +171,16 @@ private Mono publishEventWithObservation(String pubsubName, String topic, ); } + private Context getReactorContext() { + Map map = new HashMap<>(); + TextMapSetter> setter = (carrier, key, value) -> map.put(key, value); + io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current(); + + openTelemetry.getPropagators().getTextMapPropagator().inject(otelContext, map, setter); + + return Context.of(map); + } + private Observation createObservation(DaprMessagingSenderContext senderContext) { return DaprMessagingObservationDocumentation.TEMPLATE_OBSERVATION.observation( observationConvention, diff --git a/dapr-spring/pom.xml b/dapr-spring/pom.xml index a7c9474f3..9a67c459c 100644 --- a/dapr-spring/pom.xml +++ b/dapr-spring/pom.xml @@ -75,6 +75,16 @@ true + + + io.opentelemetry + opentelemetry-api + + + io.opentelemetry + opentelemetry-context + + org.springframework.boot diff --git a/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java b/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java index d96cc2552..9fce30934 100644 --- a/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java +++ b/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java @@ -18,6 +18,8 @@ import io.dapr.testcontainers.converter.SubscriptionYamlConverter; import io.dapr.testcontainers.converter.YamlConverter; import io.dapr.testcontainers.converter.YamlMapperFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.Network; import org.testcontainers.containers.wait.strategy.Wait; @@ -30,6 +32,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -37,6 +40,7 @@ import java.util.Set; public class DaprContainer extends GenericContainer { + private static final Logger LOGGER = LoggerFactory.getLogger(DaprContainer.class); private static final int DAPRD_DEFAULT_HTTP_PORT = 3500; private static final int DAPRD_DEFAULT_GRPC_PORT = 50001; private static final DaprProtocol DAPR_PROTOCOL = DaprProtocol.HTTP; @@ -235,14 +239,28 @@ protected void configure() { cmds.add(Integer.toString(appPort)); } + if (configuration != null) { + cmds.add("--config"); + cmds.add("/dapr-resources/" + configuration.getName() + ".yaml"); + } + cmds.add("--log-level"); cmds.add(daprLogLevel.toString()); cmds.add("--resources-path"); cmds.add("/dapr-resources"); - withCommand(cmds.toArray(new String[]{})); + + String[] cmdArray = cmds.toArray(new String[]{}); + LOGGER.info("> `daprd` Command: \n"); + LOGGER.info("\t" + Arrays.toString(cmdArray) + "\n"); + + withCommand(cmdArray); if (configuration != null) { String configurationYaml = CONFIGURATION_CONVERTER.convert(configuration); + + LOGGER.info("> Configuration YAML: \n"); + LOGGER.info("\t\n" + configurationYaml + "\n"); + withCopyToContainer(Transferable.of(configurationYaml), "/dapr-resources/" + configuration.getName() + ".yaml"); } @@ -257,11 +275,19 @@ protected void configure() { for (Component component : components) { String componentYaml = COMPONENT_CONVERTER.convert(component); + + LOGGER.info("> Component YAML: \n"); + LOGGER.info("\t\n" + componentYaml + "\n"); + withCopyToContainer(Transferable.of(componentYaml), "/dapr-resources/" + component.getName() + ".yaml"); } for (Subscription subscription : subscriptions) { String subscriptionYaml = SUBSCRIPTION_CONVERTER.convert(subscription); + + LOGGER.info("> Subscription YAML: \n"); + LOGGER.info("\t\n" + subscriptionYaml + "\n"); + withCopyToContainer(Transferable.of(subscriptionYaml), "/dapr-resources/" + subscription.getName() + ".yaml"); }