diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index 8f086934c0..c2f2a3ab57 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/TBD54566975/ftl/backend/controller/observability" "github.com/TBD54566975/ftl/backend/controller/sql" dalerrs "github.com/TBD54566975/ftl/backend/dal" "github.com/TBD54566975/ftl/backend/schema" @@ -23,6 +24,7 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, pa if err != nil { return dalerrs.TranslatePGError(err) } + observability.PubSub.Published(ctx, module, topic) return nil } @@ -100,6 +102,8 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t if err != nil { return 0, fmt.Errorf("failed to schedule async task for subscription: %w", dalerrs.TranslatePGError(err)) } + + observability.PubSub.SubscriberCalled(ctx, subscription.Topic.Payload.Name, schema.RefKey{Module: subscription.Key.Payload.Module, Name: subscription.Name}, subscriber.Sink) successful++ } return successful, nil diff --git a/backend/controller/observability/observability.go b/backend/controller/observability/observability.go index 176e61af49..da8165527b 100644 --- a/backend/controller/observability/observability.go +++ b/backend/controller/observability/observability.go @@ -1,19 +1,25 @@ package observability import ( + "errors" "fmt" ) var ( - FSM *FSMMetrics + FSM *FSMMetrics + PubSub *PubSubMetrics ) func init() { + var errs error var err error FSM, err = initFSMMetrics() + errs = errors.Join(errs, err) + PubSub, err = initPubSubMetrics() + errs = errors.Join(errs, err) if err != nil { - panic(fmt.Errorf("could not initialize controller metrics: %w", err)) + panic(fmt.Errorf("could not initialize controller metrics: %w", errs)) } } diff --git a/backend/controller/observability/pubsub.go b/backend/controller/observability/pubsub.go new file mode 100644 index 0000000000..242bf32912 --- /dev/null +++ b/backend/controller/observability/pubsub.go @@ -0,0 +1,76 @@ +package observability + +import ( + "context" + "errors" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/noop" + + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/observability" +) + +const ( + pubsubMeterName = "ftl.pubsub" + pubsubTopicNameAttribute = "ftl.pubsub.topic.name" + pubsubSubscriptionRefAttribute = "ftl.pubsub.subscription.ref" + pubsubSubscriberRefAttribute = "ftl.pubsub.subscriber.sink.ref" +) + +type PubSubMetrics struct { + meter metric.Meter + published metric.Int64Counter + subscriberCalled metric.Int64Counter +} + +func initPubSubMetrics() (*PubSubMetrics, error) { + result := &PubSubMetrics{} + var errs error + var err error + + result.meter = otel.Meter(pubsubMeterName) + + counterName := fmt.Sprintf("%s.published", pubsubMeterName) + if result.published, err = result.meter.Int64Counter( + counterName, + metric.WithUnit("1"), + metric.WithDescription("the number of times that an event is published to a topic")); err != nil { + errs = handleInitCounterError(errs, err, counterName) + result.published = noop.Int64Counter{} + } + + counterName = fmt.Sprintf("%s.subscriber.called", pubsubMeterName) + if result.subscriberCalled, err = result.meter.Int64Counter( + counterName, + metric.WithUnit("1"), + metric.WithDescription("the number of times that a pubsub event has been enqueued to asynchronously send to a subscriber")); err != nil { + errs = handleInitCounterError(errs, err, counterName) + result.subscriberCalled = noop.Int64Counter{} + } + + return result, errs +} + +func handleInitCounterError(errs error, err error, counterName string) error { + return errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counterName, err)) +} + +func (m *PubSubMetrics) Published(ctx context.Context, module, topic string) { + m.published.Add(ctx, 1, metric.WithAttributes( + attribute.String(observability.ModuleNameAttribute, module), + attribute.String(pubsubTopicNameAttribute, topic), + )) +} + +func (m *PubSubMetrics) SubscriberCalled(ctx context.Context, topic string, subscription, sink schema.RefKey) { + m.subscriberCalled.Add(ctx, 1, metric.WithAttributes( + attribute.String(observability.ModuleNameAttribute, sink.Module), + attribute.String(pubsubTopicNameAttribute, topic), + attribute.String(pubsubSubscriptionRefAttribute, subscription.String()), + attribute.String(pubsubSubscriberRefAttribute, sink.String()), + )) +}