diff --git a/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml b/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml
index 5cbcba6ae..3bc4bc18f 100644
--- a/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml
+++ b/dapr-spring/dapr-spring-boot-autoconfigure/pom.xml
@@ -28,8 +28,8 @@
true
- org.springframework.boot
- spring-boot-starter
+ org.springframework.boot
+ spring-boot-starter
org.springframework.boot
diff --git a/dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/pubsub/DaprPubSubProperties.java b/dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/pubsub/DaprPubSubProperties.java
index 9cd038538..d598b9c99 100644
--- a/dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/pubsub/DaprPubSubProperties.java
+++ b/dapr-spring/dapr-spring-boot-autoconfigure/src/main/java/io/dapr/spring/boot/autoconfigure/pubsub/DaprPubSubProperties.java
@@ -25,6 +25,7 @@ public class DaprPubSubProperties {
* Name of the PubSub Dapr component.
*/
private String name;
+ private boolean observationEnabled;
public String getName() {
return name;
@@ -34,4 +35,11 @@ public void setName(String name) {
this.name = name;
}
+ public boolean isObservationEnabled() {
+ return observationEnabled;
+ }
+
+ public void setObservationEnabled(boolean observationEnabled) {
+ this.observationEnabled = observationEnabled;
+ }
}
diff --git a/dapr-spring/dapr-spring-messaging/pom.xml b/dapr-spring/dapr-spring-messaging/pom.xml
index 135e904db..c9b280a47 100644
--- a/dapr-spring/dapr-spring-messaging/pom.xml
+++ b/dapr-spring/dapr-spring-messaging/pom.xml
@@ -12,6 +12,6 @@
dapr-spring-messaging
dapr-spring-messaging
Dapr Spring Messaging
- jar
+ jar
diff --git a/dapr-spring/dapr-spring-messaging/src/main/java/io/dapr/spring/messaging/DaprMessagingTemplate.java b/dapr-spring/dapr-spring-messaging/src/main/java/io/dapr/spring/messaging/DaprMessagingTemplate.java
index 584d91fa5..6e4140936 100644
--- a/dapr-spring/dapr-spring-messaging/src/main/java/io/dapr/spring/messaging/DaprMessagingTemplate.java
+++ b/dapr-spring/dapr-spring-messaging/src/main/java/io/dapr/spring/messaging/DaprMessagingTemplate.java
@@ -15,20 +15,104 @@
import io.dapr.client.DaprClient;
import io.dapr.client.domain.Metadata;
+import io.dapr.spring.messaging.observation.DaprMessagingObservationConvention;
+import io.dapr.spring.messaging.observation.DaprMessagingObservationDocumentation;
+import io.dapr.spring.messaging.observation.DaprMessagingSenderContext;
+import io.micrometer.observation.Observation;
+import io.micrometer.observation.ObservationRegistry;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.context.propagation.TextMapSetter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.BeanNameAware;
+import org.springframework.beans.factory.SmartInitializingSingleton;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
import reactor.core.publisher.Mono;
+import reactor.util.context.Context;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
import java.util.Map;
-public class DaprMessagingTemplate implements DaprMessagingOperations {
+/**
+ * Create a new DaprMessagingTemplate.
+ * @param templated message type
+ */
+public class DaprMessagingTemplate implements DaprMessagingOperations, ApplicationContextAware, BeanNameAware,
+ SmartInitializingSingleton {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DaprMessagingTemplate.class);
private static final String MESSAGE_TTL_IN_SECONDS = "10";
+ private static final DaprMessagingObservationConvention DEFAULT_OBSERVATION_CONVENTION =
+ DaprMessagingObservationConvention.getDefault();
private final DaprClient daprClient;
private final String pubsubName;
+ private final Map metadata;
+ private final boolean observationEnabled;
+
+ @Nullable
+ private ApplicationContext applicationContext;
+
+ @Nullable
+ private String beanName;
+
+ @Nullable
+ private OpenTelemetry openTelemetry;
+
+ @Nullable
+ private ObservationRegistry observationRegistry;
- public DaprMessagingTemplate(DaprClient daprClient, String pubsubName) {
+ @Nullable
+ private DaprMessagingObservationConvention observationConvention;
+
+ /**
+ * Constructs a new DaprMessagingTemplate.
+ * @param daprClient Dapr client
+ * @param pubsubName pubsub name
+ * @param observationEnabled whether to enable observations
+ */
+ public DaprMessagingTemplate(DaprClient daprClient, String pubsubName, boolean observationEnabled) {
this.daprClient = daprClient;
this.pubsubName = pubsubName;
+ this.metadata = Map.of(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS);
+ this.observationEnabled = observationEnabled;
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) {
+ this.applicationContext = applicationContext;
+ }
+
+ @Override
+ public void setBeanName(String beanName) {
+ this.beanName = beanName;
+ }
+
+ /**
+ * If observations are enabled, attempt to obtain the Observation registry and
+ * convention.
+ */
+ @Override
+ public void afterSingletonsInstantiated() {
+ if (!observationEnabled) {
+ LOGGER.debug("Observations are not enabled - not recording");
+ return;
+ }
+
+ if (applicationContext == null) {
+ LOGGER.warn("Observations enabled but application context null - not recording");
+ return;
+ }
+
+ observationRegistry = applicationContext.getBeanProvider(ObservationRegistry.class)
+ .getIfUnique(() -> observationRegistry);
+ this.openTelemetry = this.applicationContext.getBeanProvider(OpenTelemetry.class)
+ .getIfUnique(() -> this.openTelemetry);
+ observationConvention = applicationContext.getBeanProvider(DaprMessagingObservationConvention.class)
+ .getIfUnique(() -> observationConvention);
}
@Override
@@ -38,7 +122,7 @@ public void send(String topic, T message) {
@Override
public SendMessageBuilder newMessage(T message) {
- return new SendMessageBuilderImpl<>(this, message);
+ return new DefaultSendMessageBuilder<>(this, message);
}
private void doSend(String topic, T message) {
@@ -46,13 +130,67 @@ private void doSend(String topic, T message) {
}
private Mono doSendAsync(String topic, T message) {
- return daprClient.publishEvent(pubsubName,
- topic,
- message,
- Map.of(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS));
+ LOGGER.trace("Sending message to '{}' topic", topic);
+
+ if (canUseObservation()) {
+ return publishEventWithObservation(pubsubName, topic, message);
+ }
+
+ return publishEvent(pubsubName, topic, message);
+ }
+
+ private boolean canUseObservation() {
+ return observationEnabled
+ && observationRegistry != null
+ && openTelemetry != null
+ && beanName != null;
+ }
+
+ private Mono publishEvent(String pubsubName, String topic, T message) {
+ return daprClient.publishEvent(pubsubName, topic, message, metadata);
+ }
+
+ private Mono publishEventWithObservation(String pubsubName, String topic, T message) {
+ DaprMessagingSenderContext senderContext = DaprMessagingSenderContext.newContext(topic, this.beanName);
+ Observation observation = createObservation(senderContext);
+
+ return observation.observe(() ->
+ publishEvent(pubsubName, topic, message)
+ .contextWrite(getReactorContext())
+ .doOnError(err -> {
+ LOGGER.error("Failed to send msg to '{}' topic", topic, err);
+
+ observation.error(err);
+ observation.stop();
+ })
+ .doOnSuccess(ignore -> {
+ LOGGER.trace("Sent msg to '{}' topic", topic);
+
+ observation.stop();
+ })
+ );
+ }
+
+ private Context getReactorContext() {
+ Map map = new HashMap<>();
+ TextMapSetter
+
+
+ io.opentelemetry
+ opentelemetry-api
+
+
+ io.opentelemetry
+ opentelemetry-context
+
+
org.springframework.boot
diff --git a/sdk-tests/src/test/java/io/dapr/it/spring/messaging/TestApplication.java b/sdk-tests/src/test/java/io/dapr/it/spring/messaging/TestApplication.java
index b9c34c136..44c832dc4 100644
--- a/sdk-tests/src/test/java/io/dapr/it/spring/messaging/TestApplication.java
+++ b/sdk-tests/src/test/java/io/dapr/it/spring/messaging/TestApplication.java
@@ -35,7 +35,9 @@ static class DaprSpringMessagingConfiguration {
@Bean
public DaprMessagingTemplate messagingTemplate(DaprClient daprClient,
DaprPubSubProperties daprPubSubProperties) {
- return new DaprMessagingTemplate<>(daprClient, daprPubSubProperties.getName());
+ String pubsubName = daprPubSubProperties.getName();
+ boolean observationEnabled = daprPubSubProperties.isObservationEnabled();
+ return new DaprMessagingTemplate<>(daprClient, pubsubName, observationEnabled);
}
}
diff --git a/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java b/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java
index d96cc2552..9fce30934 100644
--- a/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java
+++ b/testcontainers-dapr/src/main/java/io/dapr/testcontainers/DaprContainer.java
@@ -18,6 +18,8 @@
import io.dapr.testcontainers.converter.SubscriptionYamlConverter;
import io.dapr.testcontainers.converter.YamlConverter;
import io.dapr.testcontainers.converter.YamlMapperFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
@@ -30,6 +32,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -37,6 +40,7 @@
import java.util.Set;
public class DaprContainer extends GenericContainer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DaprContainer.class);
private static final int DAPRD_DEFAULT_HTTP_PORT = 3500;
private static final int DAPRD_DEFAULT_GRPC_PORT = 50001;
private static final DaprProtocol DAPR_PROTOCOL = DaprProtocol.HTTP;
@@ -235,14 +239,28 @@ protected void configure() {
cmds.add(Integer.toString(appPort));
}
+ if (configuration != null) {
+ cmds.add("--config");
+ cmds.add("/dapr-resources/" + configuration.getName() + ".yaml");
+ }
+
cmds.add("--log-level");
cmds.add(daprLogLevel.toString());
cmds.add("--resources-path");
cmds.add("/dapr-resources");
- withCommand(cmds.toArray(new String[]{}));
+
+ String[] cmdArray = cmds.toArray(new String[]{});
+ LOGGER.info("> `daprd` Command: \n");
+ LOGGER.info("\t" + Arrays.toString(cmdArray) + "\n");
+
+ withCommand(cmdArray);
if (configuration != null) {
String configurationYaml = CONFIGURATION_CONVERTER.convert(configuration);
+
+ LOGGER.info("> Configuration YAML: \n");
+ LOGGER.info("\t\n" + configurationYaml + "\n");
+
withCopyToContainer(Transferable.of(configurationYaml), "/dapr-resources/" + configuration.getName() + ".yaml");
}
@@ -257,11 +275,19 @@ protected void configure() {
for (Component component : components) {
String componentYaml = COMPONENT_CONVERTER.convert(component);
+
+ LOGGER.info("> Component YAML: \n");
+ LOGGER.info("\t\n" + componentYaml + "\n");
+
withCopyToContainer(Transferable.of(componentYaml), "/dapr-resources/" + component.getName() + ".yaml");
}
for (Subscription subscription : subscriptions) {
String subscriptionYaml = SUBSCRIPTION_CONVERTER.convert(subscription);
+
+ LOGGER.info("> Subscription YAML: \n");
+ LOGGER.info("\t\n" + subscriptionYaml + "\n");
+
withCopyToContainer(Transferable.of(subscriptionYaml), "/dapr-resources/" + subscription.getName() + ".yaml");
}