diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index c2f2a3ab57..fab5b0acb1 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/alecthomas/types/optional" + "github.com/TBD54566975/ftl/backend/controller/observability" "github.com/TBD54566975/ftl/backend/controller/sql" dalerrs "github.com/TBD54566975/ftl/backend/dal" @@ -21,10 +23,10 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, pa Topic: topic, Payload: payload, }) + observability.PubSub.Published(ctx, module, topic, err) if err != nil { return dalerrs.TranslatePGError(err) } - observability.PubSub.Published(ctx, module, topic) return nil } @@ -63,10 +65,12 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t for _, subscription := range subs { nextCursor, err := tx.db.GetNextEventForSubscription(ctx, eventConsumptionDelay, subscription.Topic, subscription.Cursor) if err != nil { + observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.None[schema.RefKey]()) return 0, fmt.Errorf("failed to get next cursor: %w", dalerrs.TranslatePGError(err)) } nextCursorKey, ok := nextCursor.Event.Get() if !ok { + observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription-->Event.Get", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.None[schema.RefKey]()) return 0, fmt.Errorf("could not find event to progress subscription: %w", dalerrs.TranslatePGError(err)) } if !nextCursor.Ready { @@ -82,6 +86,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t err = tx.db.BeginConsumingTopicEvent(ctx, subscription.Key, nextCursorKey) if err != nil { + observability.PubSub.PropagationFailed(ctx, "BeginConsumingTopicEvent", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.Some(subscriber.Sink)) return 0, fmt.Errorf("failed to progress subscription: %w", dalerrs.TranslatePGError(err)) } @@ -100,15 +105,20 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t MaxBackoff: subscriber.MaxBackoff, }) if err != nil { + observability.PubSub.PropagationFailed(ctx, "CreateAsyncCall", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.Some(subscriber.Sink)) return 0, fmt.Errorf("failed to schedule async task for subscription: %w", dalerrs.TranslatePGError(err)) } - observability.PubSub.SubscriberCalled(ctx, subscription.Topic.Payload.Name, schema.RefKey{Module: subscription.Key.Payload.Module, Name: subscription.Name}, subscriber.Sink) + observability.PubSub.SinkCalled(ctx, subscription.Topic.Payload.Name, subscriptionRef(subscription), subscriber.Sink) successful++ } return successful, nil } +func subscriptionRef(subscription sql.GetSubscriptionsNeedingUpdateRow) schema.RefKey { + return schema.RefKey{Module: subscription.Key.Payload.Module, Name: subscription.Name} +} + func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error { err := d.db.CompleteEventForSubscription(ctx, name, module) if err != nil { diff --git a/backend/controller/observability/pubsub.go b/backend/controller/observability/pubsub.go index 242bf32912..eaf7f15b44 100644 --- a/backend/controller/observability/pubsub.go +++ b/backend/controller/observability/pubsub.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" + "github.com/alecthomas/types/optional" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -14,17 +15,24 @@ import ( "github.com/TBD54566975/ftl/internal/observability" ) +// To learn more about how sinks and subscriptions work together, check out the +// https://tbd54566975.github.io/ftl/docs/reference/pubsub/ const ( - pubsubMeterName = "ftl.pubsub" - pubsubTopicNameAttribute = "ftl.pubsub.topic.name" - pubsubSubscriptionRefAttribute = "ftl.pubsub.subscription.ref" - pubsubSubscriberRefAttribute = "ftl.pubsub.subscriber.sink.ref" + pubsubMeterName = "ftl.pubsub" + pubsubTopicNameAttr = "ftl.pubsub.topic.name" + pubsubSubscriptionRefAttr = "ftl.pubsub.subscription.ref" + pubsubSubscriptionModuleAttr = "ftl.pubsub.subscription.module.name" + pubsubSinkRefAttr = "ftl.pubsub.sink.ref" + pubsubSinkModuleAttr = "ftl.pubsub.sink.module.name" + pubsubFailedOperationAttr = "ftl.pubsub.propagation.failed_operation" + pubsubFailedToPublishErrAttr = "ftl.pubsub.publish.error.message" ) type PubSubMetrics struct { - meter metric.Meter - published metric.Int64Counter - subscriberCalled metric.Int64Counter + meter metric.Meter + published metric.Int64Counter + propagationFailed metric.Int64Counter + sinkCalled metric.Int64Counter } func initPubSubMetrics() (*PubSubMetrics, error) { @@ -43,13 +51,22 @@ func initPubSubMetrics() (*PubSubMetrics, error) { result.published = noop.Int64Counter{} } - counterName = fmt.Sprintf("%s.subscriber.called", pubsubMeterName) - if result.subscriberCalled, err = result.meter.Int64Counter( + counterName = fmt.Sprintf("%s.propagation.failed", pubsubMeterName) + if result.propagationFailed, err = result.meter.Int64Counter( + counterName, + metric.WithUnit("1"), + metric.WithDescription("the number of times that subscriptions fail to progress")); err != nil { + errs = handleInitCounterError(errs, err, counterName) + result.propagationFailed = noop.Int64Counter{} + } + + counterName = fmt.Sprintf("%s.sink.called", pubsubMeterName) + if result.sinkCalled, err = result.meter.Int64Counter( counterName, metric.WithUnit("1"), metric.WithDescription("the number of times that a pubsub event has been enqueued to asynchronously send to a subscriber")); err != nil { errs = handleInitCounterError(errs, err, counterName) - result.subscriberCalled = noop.Int64Counter{} + result.sinkCalled = noop.Int64Counter{} } return result, errs @@ -59,18 +76,42 @@ func handleInitCounterError(errs error, err error, counterName string) error { return errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counterName, err)) } -func (m *PubSubMetrics) Published(ctx context.Context, module, topic string) { - m.published.Add(ctx, 1, metric.WithAttributes( +func (m *PubSubMetrics) Published(ctx context.Context, module, topic string, maybeErr error) { + attrs := []attribute.KeyValue{ attribute.String(observability.ModuleNameAttribute, module), - attribute.String(pubsubTopicNameAttribute, topic), - )) + attribute.String(pubsubTopicNameAttr, topic), + attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil), + } + + if maybeErr != nil { + attrs = append(attrs, attribute.String(pubsubFailedToPublishErrAttr, maybeErr.Error())) + } + + m.published.Add(ctx, 1, metric.WithAttributes(attrs...)) +} + +func (m *PubSubMetrics) PropagationFailed(ctx context.Context, failedOp, topic string, subscription schema.RefKey, sink optional.Option[schema.RefKey]) { + attrs := []attribute.KeyValue{ + attribute.String(pubsubFailedOperationAttr, failedOp), + attribute.String(pubsubTopicNameAttr, topic), + attribute.String(pubsubSubscriptionRefAttr, subscription.String()), + attribute.String(pubsubSubscriptionModuleAttr, subscription.Module), + } + + if sinkRef, ok := sink.Get(); ok { + attrs = append(attrs, attribute.String(pubsubSinkRefAttr, sinkRef.String())) + attrs = append(attrs, attribute.String(pubsubSinkModuleAttr, sinkRef.Module)) + } + + m.propagationFailed.Add(ctx, 1, metric.WithAttributes(attrs...)) } -func (m *PubSubMetrics) SubscriberCalled(ctx context.Context, topic string, subscription, sink schema.RefKey) { - m.subscriberCalled.Add(ctx, 1, metric.WithAttributes( - attribute.String(observability.ModuleNameAttribute, sink.Module), - attribute.String(pubsubTopicNameAttribute, topic), - attribute.String(pubsubSubscriptionRefAttribute, subscription.String()), - attribute.String(pubsubSubscriberRefAttribute, sink.String()), +func (m *PubSubMetrics) SinkCalled(ctx context.Context, topic string, subscription, sink schema.RefKey) { + m.sinkCalled.Add(ctx, 1, metric.WithAttributes( + attribute.String(pubsubTopicNameAttr, topic), + attribute.String(pubsubSubscriptionRefAttr, subscription.String()), + attribute.String(pubsubSubscriptionModuleAttr, subscription.Module), + attribute.String(pubsubSinkRefAttr, sink.String()), + attribute.String(pubsubSinkModuleAttr, sink.Module), )) } diff --git a/internal/observability/attributes.go b/internal/observability/attributes.go index ff6bd6c558..878ac1a8f2 100644 --- a/internal/observability/attributes.go +++ b/internal/observability/attributes.go @@ -1,5 +1,6 @@ package observability const ( - ModuleNameAttribute = "ftl.module.name" + ModuleNameAttribute = "ftl.module.name" + StatusSucceededAttribute = "ftl.status.succeeded" )