Skip to content

Commit

Permalink
feat: propagate Publish caller through all pubsub metrics (#2231)
Browse files Browse the repository at this point in the history
Previously, only `ftl.pubsub.published` had the caller attribute. This
PR adds caller to the rest of the pubsub metrics and also slightly
restructures the topic name/ref logging to be more consistent with the
rest of this file

```
Metric #0
Descriptor:
     -> Name: ftl.pubsub.published
     -> Description: the number of times that an event is published to a topic
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.module.name: Str(echo)
     -> ftl.pubsub.publish.caller.verb.ref: Str(echo.Echo)
     -> ftl.pubsub.topic.ref: Str(echo.echotopic)
     -> ftl.status.succeeded: Bool(true)
StartTimestamp: 2024-08-01 01:19:43.556864 +0000 UTC
Timestamp: 2024-08-01 01:20:08.558236 +0000 UTC
Value: 1

Metric #1
Descriptor:
     -> Name: ftl.pubsub.sink.called
     -> Description: the number of times that a pubsub event has been enqueued to asynchronously send to a subscriber
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.pubsub.publish.caller.verb.ref: Str(echo.Echo)
     -> ftl.pubsub.sink.module.name: Str(echo)
     -> ftl.pubsub.sink.ref: Str(echo.echoSinkOne)
     -> ftl.pubsub.subscription.module.name: Str(echo)
     -> ftl.pubsub.subscription.ref: Str(echo.sub)
     -> ftl.pubsub.topic.module.name: Str(echo)
     -> ftl.pubsub.topic.ref: Str(echo.echotopic)
```

Issue: #2194
  • Loading branch information
deniseli authored Aug 1, 2024
1 parent 8dd18e7 commit c38a754
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 19 deletions.
1 change: 1 addition & 0 deletions backend/controller/cronjobs/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic, caller st
Key: model.NewTopicEventKey(module, topic),
Module: module,
Topic: topic,
Caller: caller,
Payload: payload,
})
observability.PubSub.Published(ctx, module, topic, caller, err)
Expand Down Expand Up @@ -66,12 +67,12 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
for _, subscription := range subs {
nextCursor, err := tx.db.GetNextEventForSubscription(ctx, eventConsumptionDelay, subscription.Topic, subscription.Cursor)
if err != nil {
observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.None[schema.RefKey]())
observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]())
return 0, fmt.Errorf("failed to get next cursor: %w", dalerrs.TranslatePGError(err))
}
nextCursorKey, ok := nextCursor.Event.Get()
if !ok {
observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription-->Event.Get", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.None[schema.RefKey]())
observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription-->Event.Get", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.None[schema.RefKey]())
return 0, fmt.Errorf("could not find event to progress subscription: %w", dalerrs.TranslatePGError(err))
}
if !nextCursor.Ready {
Expand All @@ -87,7 +88,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t

err = tx.db.BeginConsumingTopicEvent(ctx, subscription.Key, nextCursorKey)
if err != nil {
observability.PubSub.PropagationFailed(ctx, "BeginConsumingTopicEvent", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.Some(subscriber.Sink))
observability.PubSub.PropagationFailed(ctx, "BeginConsumingTopicEvent", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.Some(subscriber.Sink))
return 0, fmt.Errorf("failed to progress subscription: %w", dalerrs.TranslatePGError(err))
}

Expand All @@ -106,11 +107,11 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
MaxBackoff: subscriber.MaxBackoff,
})
if err != nil {
observability.PubSub.PropagationFailed(ctx, "CreateAsyncCall", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.Some(subscriber.Sink))
observability.PubSub.PropagationFailed(ctx, "CreateAsyncCall", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.Some(subscriber.Sink))
return 0, fmt.Errorf("failed to schedule async task for subscription: %w", dalerrs.TranslatePGError(err))
}

observability.PubSub.SinkCalled(ctx, subscription.Topic.Payload.Name, subscriptionRef(subscription), subscriber.Sink)
observability.PubSub.SinkCalled(ctx, subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), subscriber.Sink)
successful++
}
return successful, nil
Expand Down
36 changes: 26 additions & 10 deletions backend/controller/observability/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ import (
"go.opentelemetry.io/otel/metric/noop"

"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/observability"
)

// To learn more about how sinks and subscriptions work together, check out the
// https://tbd54566975.github.io/ftl/docs/reference/pubsub/
const (
pubsubMeterName = "ftl.pubsub"
pubsubTopicNameAttr = "ftl.pubsub.topic.name"
pubsubCallerVerbNameAttr = "ftl.pubsub.publish.caller.verb.name"
pubsubTopicRefAttr = "ftl.pubsub.topic.ref"
pubsubTopicModuleAttr = "ftl.pubsub.topic.module.name"
pubsubCallerVerbRefAttr = "ftl.pubsub.publish.caller.verb.ref"
pubsubSubscriptionRefAttr = "ftl.pubsub.subscription.ref"
pubsubSubscriptionModuleAttr = "ftl.pubsub.subscription.module.name"
pubsubSinkRefAttr = "ftl.pubsub.sink.ref"
Expand Down Expand Up @@ -79,22 +81,28 @@ func handleInitCounterError(errs error, err error, counterName string) error {
func (m *PubSubMetrics) Published(ctx context.Context, module, topic, caller string, maybeErr error) {
attrs := []attribute.KeyValue{
attribute.String(observability.ModuleNameAttribute, module),
attribute.String(pubsubTopicNameAttr, topic),
attribute.String(pubsubCallerVerbNameAttr, caller),
attribute.String(pubsubTopicRefAttr, schema.RefKey{Module: module, Name: topic}.String()),
attribute.String(pubsubCallerVerbRefAttr, schema.RefKey{Module: module, Name: caller}.String()),
attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil),
}

m.published.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func (m *PubSubMetrics) PropagationFailed(ctx context.Context, failedOp, topic string, subscription schema.RefKey, sink optional.Option[schema.RefKey]) {
func (m *PubSubMetrics) PropagationFailed(ctx context.Context, failedOp string, topic model.TopicPayload, optCaller optional.Option[string], subscription schema.RefKey, sink optional.Option[schema.RefKey]) {
attrs := []attribute.KeyValue{
attribute.String(pubsubFailedOperationAttr, failedOp),
attribute.String(pubsubTopicNameAttr, topic),
attribute.String(pubsubTopicRefAttr, schema.RefKey{Module: topic.Module, Name: topic.Name}.String()),
attribute.String(pubsubTopicModuleAttr, topic.Module),
attribute.String(pubsubSubscriptionRefAttr, subscription.String()),
attribute.String(pubsubSubscriptionModuleAttr, subscription.Module),
}

caller, ok := optCaller.Get()
if ok {
attrs = append(attrs, attribute.String(pubsubCallerVerbRefAttr, schema.RefKey{Module: topic.Module, Name: caller}.String()))
}

if sinkRef, ok := sink.Get(); ok {
attrs = append(attrs, attribute.String(pubsubSinkRefAttr, sinkRef.String()))
attrs = append(attrs, attribute.String(pubsubSinkModuleAttr, sinkRef.Module))
Expand All @@ -103,12 +111,20 @@ func (m *PubSubMetrics) PropagationFailed(ctx context.Context, failedOp, topic s
m.propagationFailed.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func (m *PubSubMetrics) SinkCalled(ctx context.Context, topic string, subscription, sink schema.RefKey) {
m.sinkCalled.Add(ctx, 1, metric.WithAttributes(
attribute.String(pubsubTopicNameAttr, topic),
func (m *PubSubMetrics) SinkCalled(ctx context.Context, topic model.TopicPayload, optCaller optional.Option[string], subscription, sink schema.RefKey) {
attrs := []attribute.KeyValue{
attribute.String(pubsubTopicRefAttr, schema.RefKey{Module: topic.Module, Name: topic.Name}.String()),
attribute.String(pubsubTopicModuleAttr, topic.Module),
attribute.String(pubsubSubscriptionRefAttr, subscription.String()),
attribute.String(pubsubSubscriptionModuleAttr, subscription.Module),
attribute.String(pubsubSinkRefAttr, sink.String()),
attribute.String(pubsubSinkModuleAttr, sink.Module),
))
}

caller, ok := optCaller.Get()
if ok {
attrs = append(attrs, attribute.String(pubsubCallerVerbRefAttr, schema.RefKey{Module: topic.Module, Name: caller}.String()))
}

m.sinkCalled.Add(ctx, 1, metric.WithAttributes(attrs...))
}
1 change: 1 addition & 0 deletions backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ VALUES (
INSERT INTO topic_events (
"key",
topic_id,
caller,
payload
)
VALUES (
Expand All @@ -665,6 +666,7 @@ VALUES (
WHERE modules.name = sqlc.arg('module')::TEXT
AND topics.name = sqlc.arg('topic')::TEXT
),
sqlc.arg('caller'),
sqlc.arg('payload')
);

Expand Down Expand Up @@ -697,6 +699,7 @@ WITH cursor AS (
SELECT events."key" as event,
events.payload,
events.created_at,
events.caller,
NOW() - events.created_at >= sqlc.arg('consumption_delay')::interval AS ready
FROM topics
LEFT JOIN topic_events as events ON events.topic_id = topics.id
Expand Down Expand Up @@ -769,4 +772,4 @@ WHERE id = $1::BIGINT;
-- name: GetTopicEvent :one
SELECT *
FROM topic_events
WHERE id = $1::BIGINT;
WHERE id = $1::BIGINT;
14 changes: 11 additions & 3 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- migrate:up

ALTER TABLE topic_events
ADD COLUMN caller TEXT;

-- migrate:down
1 change: 1 addition & 0 deletions common/configuration/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c38a754

Please sign in to comment.