Skip to content

Commit

Permalink
Ensure trace is properly sent using OTEL
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Ciocanu <[email protected]>
  • Loading branch information
Artur Ciocanu committed Nov 3, 2024
1 parent b5ae050 commit a8acc97
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@
import io.dapr.spring.messaging.observation.DaprMessagingSenderContext;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.propagation.TextMapSetter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -55,6 +59,9 @@ public class DaprMessagingTemplate<T> implements DaprMessagingOperations<T>, App
@Nullable
private String beanName;

@Nullable
private OpenTelemetry openTelemetry;

@Nullable
private ObservationRegistry observationRegistry;

Expand Down Expand Up @@ -102,6 +109,8 @@ public void afterSingletonsInstantiated() {

observationRegistry = applicationContext.getBeanProvider(ObservationRegistry.class)
.getIfUnique(() -> observationRegistry);
this.openTelemetry = this.applicationContext.getBeanProvider(OpenTelemetry.class)
.getIfUnique(() -> this.openTelemetry);
observationConvention = applicationContext.getBeanProvider(DaprMessagingObservationConvention.class)
.getIfUnique(() -> observationConvention);
}
Expand Down Expand Up @@ -133,7 +142,7 @@ private Mono<Void> doSendAsync(String topic, T message) {
private boolean canUseObservation() {
return observationEnabled
&& observationRegistry != null
&& observationConvention != null
&& openTelemetry != null
&& beanName != null;
}

Expand All @@ -147,6 +156,7 @@ private Mono<Void> publishEventWithObservation(String pubsubName, String topic,

return observation.observe(() ->
publishEvent(pubsubName, topic, message)
.contextWrite(getReactorContext())
.doOnError(err -> {
LOGGER.error("Failed to send msg to '{}' topic", topic, err);

Expand All @@ -161,6 +171,16 @@ private Mono<Void> publishEventWithObservation(String pubsubName, String topic,
);
}

private Context getReactorContext() {
Map<String, String> map = new HashMap<>();
TextMapSetter<Map<String, String>> setter = (carrier, key, value) -> map.put(key, value);
io.opentelemetry.context.Context otelContext = io.opentelemetry.context.Context.current();

openTelemetry.getPropagators().getTextMapPropagator().inject(otelContext, map, setter);

return Context.of(map);
}

private Observation createObservation(DaprMessagingSenderContext senderContext) {
return DaprMessagingObservationDocumentation.TEMPLATE_OBSERVATION.observation(
observationConvention,
Expand Down
10 changes: 10 additions & 0 deletions dapr-spring/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,16 @@
<optional>true</optional>
</dependency>

<!-- OTEL dependencies -->
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-context</artifactId>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import io.dapr.testcontainers.converter.SubscriptionYamlConverter;
import io.dapr.testcontainers.converter.YamlConverter;
import io.dapr.testcontainers.converter.YamlMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
Expand All @@ -30,13 +32,15 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class DaprContainer extends GenericContainer<DaprContainer> {
private static final Logger LOGGER = LoggerFactory.getLogger(DaprContainer.class);
private static final int DAPRD_DEFAULT_HTTP_PORT = 3500;
private static final int DAPRD_DEFAULT_GRPC_PORT = 50001;
private static final DaprProtocol DAPR_PROTOCOL = DaprProtocol.HTTP;
Expand Down Expand Up @@ -235,14 +239,28 @@ protected void configure() {
cmds.add(Integer.toString(appPort));
}

if (configuration != null) {
cmds.add("--config");
cmds.add("/dapr-resources/" + configuration.getName() + ".yaml");
}

cmds.add("--log-level");
cmds.add(daprLogLevel.toString());
cmds.add("--resources-path");
cmds.add("/dapr-resources");
withCommand(cmds.toArray(new String[]{}));

String[] cmdArray = cmds.toArray(new String[]{});
LOGGER.info("> `daprd` Command: \n");
LOGGER.info("\t" + Arrays.toString(cmdArray) + "\n");

withCommand(cmdArray);

if (configuration != null) {
String configurationYaml = CONFIGURATION_CONVERTER.convert(configuration);

LOGGER.info("> Configuration YAML: \n");
LOGGER.info("\t\n" + configurationYaml + "\n");

withCopyToContainer(Transferable.of(configurationYaml), "/dapr-resources/" + configuration.getName() + ".yaml");
}

Expand All @@ -257,11 +275,19 @@ protected void configure() {

for (Component component : components) {
String componentYaml = COMPONENT_CONVERTER.convert(component);

LOGGER.info("> Component YAML: \n");
LOGGER.info("\t\n" + componentYaml + "\n");

withCopyToContainer(Transferable.of(componentYaml), "/dapr-resources/" + component.getName() + ".yaml");
}

for (Subscription subscription : subscriptions) {
String subscriptionYaml = SUBSCRIPTION_CONVERTER.convert(subscription);

LOGGER.info("> Subscription YAML: \n");
LOGGER.info("\t\n" + subscriptionYaml + "\n");

withCopyToContainer(Transferable.of(subscriptionYaml), "/dapr-resources/" + subscription.getName() + ".yaml");
}

Expand Down

0 comments on commit a8acc97

Please sign in to comment.