diff --git a/backend/controller/cronjobs/sql/models.go b/backend/controller/cronjobs/sql/models.go index ef62b768d7..a8e7d0b65e 100644 --- a/backend/controller/cronjobs/sql/models.go +++ b/backend/controller/cronjobs/sql/models.go @@ -523,6 +523,7 @@ type TopicEvent struct { Key model.TopicEventKey TopicID int64 Payload []byte + Caller string } type TopicSubscriber struct { diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index 16d818b246..398b0cc610 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -22,6 +22,7 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic, caller st Key: model.NewTopicEventKey(module, topic), Module: module, Topic: topic, + Caller: caller, Payload: payload, }) observability.PubSub.Published(ctx, module, topic, caller, err) @@ -66,12 +67,12 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t for _, subscription := range subs { nextCursor, err := tx.db.GetNextEventForSubscription(ctx, eventConsumptionDelay, subscription.Topic, subscription.Cursor) if err != nil { - observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.None[schema.RefKey]()) + observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]()) return 0, fmt.Errorf("failed to get next cursor: %w", dalerrs.TranslatePGError(err)) } nextCursorKey, ok := nextCursor.Event.Get() if !ok { - observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription-->Event.Get", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.None[schema.RefKey]()) + observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription-->Event.Get", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]()) return 0, fmt.Errorf("could not find event to progress subscription: %w", dalerrs.TranslatePGError(err)) } if !nextCursor.Ready { @@ -87,7 +88,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t err = tx.db.BeginConsumingTopicEvent(ctx, subscription.Key, nextCursorKey) if err != nil { - observability.PubSub.PropagationFailed(ctx, "BeginConsumingTopicEvent", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.Some(subscriber.Sink)) + observability.PubSub.PropagationFailed(ctx, "BeginConsumingTopicEvent", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.Some(subscriber.Sink)) return 0, fmt.Errorf("failed to progress subscription: %w", dalerrs.TranslatePGError(err)) } @@ -106,11 +107,11 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t MaxBackoff: subscriber.MaxBackoff, }) if err != nil { - observability.PubSub.PropagationFailed(ctx, "CreateAsyncCall", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.Some(subscriber.Sink)) + observability.PubSub.PropagationFailed(ctx, "CreateAsyncCall", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.Some(subscriber.Sink)) return 0, fmt.Errorf("failed to schedule async task for subscription: %w", dalerrs.TranslatePGError(err)) } - observability.PubSub.SinkCalled(ctx, subscription.Topic.Payload.Name, subscriptionRef(subscription), subscriber.Sink) + observability.PubSub.SinkCalled(ctx, subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), subscriber.Sink) successful++ } return successful, nil diff --git a/backend/controller/observability/pubsub.go b/backend/controller/observability/pubsub.go index 4c7114dce3..42f2a51045 100644 --- a/backend/controller/observability/pubsub.go +++ b/backend/controller/observability/pubsub.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/otel/metric/noop" "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/model" "github.com/TBD54566975/ftl/internal/observability" ) @@ -19,8 +20,9 @@ import ( // https://tbd54566975.github.io/ftl/docs/reference/pubsub/ const ( pubsubMeterName = "ftl.pubsub" - pubsubTopicNameAttr = "ftl.pubsub.topic.name" - pubsubCallerVerbNameAttr = "ftl.pubsub.publish.caller.verb.name" + pubsubTopicRefAttr = "ftl.pubsub.topic.ref" + pubsubTopicModuleAttr = "ftl.pubsub.topic.module.name" + pubsubCallerVerbRefAttr = "ftl.pubsub.publish.caller.verb.ref" pubsubSubscriptionRefAttr = "ftl.pubsub.subscription.ref" pubsubSubscriptionModuleAttr = "ftl.pubsub.subscription.module.name" pubsubSinkRefAttr = "ftl.pubsub.sink.ref" @@ -79,22 +81,28 @@ func handleInitCounterError(errs error, err error, counterName string) error { func (m *PubSubMetrics) Published(ctx context.Context, module, topic, caller string, maybeErr error) { attrs := []attribute.KeyValue{ attribute.String(observability.ModuleNameAttribute, module), - attribute.String(pubsubTopicNameAttr, topic), - attribute.String(pubsubCallerVerbNameAttr, caller), + attribute.String(pubsubTopicRefAttr, schema.RefKey{Module: module, Name: topic}.String()), + attribute.String(pubsubCallerVerbRefAttr, schema.RefKey{Module: module, Name: caller}.String()), attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil), } m.published.Add(ctx, 1, metric.WithAttributes(attrs...)) } -func (m *PubSubMetrics) PropagationFailed(ctx context.Context, failedOp, topic string, subscription schema.RefKey, sink optional.Option[schema.RefKey]) { +func (m *PubSubMetrics) PropagationFailed(ctx context.Context, failedOp string, topic model.TopicPayload, optCaller optional.Option[string], subscription schema.RefKey, sink optional.Option[schema.RefKey]) { attrs := []attribute.KeyValue{ attribute.String(pubsubFailedOperationAttr, failedOp), - attribute.String(pubsubTopicNameAttr, topic), + attribute.String(pubsubTopicRefAttr, schema.RefKey{Module: topic.Module, Name: topic.Name}.String()), + attribute.String(pubsubTopicModuleAttr, topic.Module), attribute.String(pubsubSubscriptionRefAttr, subscription.String()), attribute.String(pubsubSubscriptionModuleAttr, subscription.Module), } + caller, ok := optCaller.Get() + if ok { + attrs = append(attrs, attribute.String(pubsubCallerVerbRefAttr, schema.RefKey{Module: topic.Module, Name: caller}.String())) + } + if sinkRef, ok := sink.Get(); ok { attrs = append(attrs, attribute.String(pubsubSinkRefAttr, sinkRef.String())) attrs = append(attrs, attribute.String(pubsubSinkModuleAttr, sinkRef.Module)) @@ -103,12 +111,20 @@ func (m *PubSubMetrics) PropagationFailed(ctx context.Context, failedOp, topic s m.propagationFailed.Add(ctx, 1, metric.WithAttributes(attrs...)) } -func (m *PubSubMetrics) SinkCalled(ctx context.Context, topic string, subscription, sink schema.RefKey) { - m.sinkCalled.Add(ctx, 1, metric.WithAttributes( - attribute.String(pubsubTopicNameAttr, topic), +func (m *PubSubMetrics) SinkCalled(ctx context.Context, topic model.TopicPayload, optCaller optional.Option[string], subscription, sink schema.RefKey) { + attrs := []attribute.KeyValue{ + attribute.String(pubsubTopicRefAttr, schema.RefKey{Module: topic.Module, Name: topic.Name}.String()), + attribute.String(pubsubTopicModuleAttr, topic.Module), attribute.String(pubsubSubscriptionRefAttr, subscription.String()), attribute.String(pubsubSubscriptionModuleAttr, subscription.Module), attribute.String(pubsubSinkRefAttr, sink.String()), attribute.String(pubsubSinkModuleAttr, sink.Module), - )) + } + + caller, ok := optCaller.Get() + if ok { + attrs = append(attrs, attribute.String(pubsubCallerVerbRefAttr, schema.RefKey{Module: topic.Module, Name: caller}.String())) + } + + m.sinkCalled.Add(ctx, 1, metric.WithAttributes(attrs...)) } diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index ef62b768d7..a8e7d0b65e 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -523,6 +523,7 @@ type TopicEvent struct { Key model.TopicEventKey TopicID int64 Payload []byte + Caller string } type TopicSubscriber struct { diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index 86548b00de..5db1ba9092 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -654,6 +654,7 @@ VALUES ( INSERT INTO topic_events ( "key", topic_id, + caller, payload ) VALUES ( @@ -665,6 +666,7 @@ VALUES ( WHERE modules.name = sqlc.arg('module')::TEXT AND topics.name = sqlc.arg('topic')::TEXT ), + sqlc.arg('caller'), sqlc.arg('payload') ); @@ -697,6 +699,7 @@ WITH cursor AS ( SELECT events."key" as event, events.payload, events.created_at, + events.caller, NOW() - events.created_at >= sqlc.arg('consumption_delay')::interval AS ready FROM topics LEFT JOIN topic_events as events ON events.topic_id = topics.id @@ -769,4 +772,4 @@ WHERE id = $1::BIGINT; -- name: GetTopicEvent :one SELECT * FROM topic_events -WHERE id = $1::BIGINT; \ No newline at end of file +WHERE id = $1::BIGINT; diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 8fd8315e2e..52bdeed0f6 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -103,7 +103,7 @@ func (q *Queries) AssociateArtefactWithDeployment(ctx context.Context, arg Assoc const beginConsumingTopicEvent = `-- name: BeginConsumingTopicEvent :exec WITH event AS ( - SELECT id, created_at, key, topic_id, payload + SELECT id, created_at, key, topic_id, payload, caller FROM topic_events WHERE "key" = $2::topic_event_key ) @@ -1132,6 +1132,7 @@ WITH cursor AS ( SELECT events."key" as event, events.payload, events.created_at, + events.caller, NOW() - events.created_at >= $1::interval AS ready FROM topics LEFT JOIN topic_events as events ON events.topic_id = topics.id @@ -1145,6 +1146,7 @@ type GetNextEventForSubscriptionRow struct { Event optional.Option[model.TopicEventKey] Payload []byte CreatedAt optional.Option[time.Time] + Caller optional.Option[string] Ready bool } @@ -1155,6 +1157,7 @@ func (q *Queries) GetNextEventForSubscription(ctx context.Context, consumptionDe &i.Event, &i.Payload, &i.CreatedAt, + &i.Caller, &i.Ready, ) return i, err @@ -1543,7 +1546,7 @@ func (q *Queries) GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) { } const getTopicEvent = `-- name: GetTopicEvent :one -SELECT id, created_at, key, topic_id, payload +SELECT id, created_at, key, topic_id, payload, caller FROM topic_events WHERE id = $1::BIGINT ` @@ -1557,6 +1560,7 @@ func (q *Queries) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent &i.Key, &i.TopicID, &i.Payload, + &i.Caller, ) return i, err } @@ -1901,6 +1905,7 @@ const publishEventForTopic = `-- name: PublishEventForTopic :exec INSERT INTO topic_events ( "key", topic_id, + caller, payload ) VALUES ( @@ -1912,7 +1917,8 @@ VALUES ( WHERE modules.name = $2::TEXT AND topics.name = $3::TEXT ), - $4 + $4, + $5 ) ` @@ -1920,6 +1926,7 @@ type PublishEventForTopicParams struct { Key model.TopicEventKey Module string Topic string + Caller string Payload []byte } @@ -1928,6 +1935,7 @@ func (q *Queries) PublishEventForTopic(ctx context.Context, arg PublishEventForT arg.Key, arg.Module, arg.Topic, + arg.Caller, arg.Payload, ) return err diff --git a/backend/controller/sql/schema/20240731230343_create_topic_events_caller.sql b/backend/controller/sql/schema/20240731230343_create_topic_events_caller.sql new file mode 100644 index 0000000000..c8fbab4523 --- /dev/null +++ b/backend/controller/sql/schema/20240731230343_create_topic_events_caller.sql @@ -0,0 +1,6 @@ +-- migrate:up + +ALTER TABLE topic_events + ADD COLUMN caller TEXT; + +-- migrate:down diff --git a/common/configuration/sql/models.go b/common/configuration/sql/models.go index ef62b768d7..a8e7d0b65e 100644 --- a/common/configuration/sql/models.go +++ b/common/configuration/sql/models.go @@ -523,6 +523,7 @@ type TopicEvent struct { Key model.TopicEventKey TopicID int64 Payload []byte + Caller string } type TopicSubscriber struct {