Skip to content

Commit

Permalink
feat: instrument otel metrics for pubsub failures (#2200)
Browse files Browse the repository at this point in the history
- Adds `ftl.status.succeeded` attr to `ftl.pubsub.published` metric
- Adds separate subscription and subscriber `module.name` attrs to all
metrics except `.published`
- Adds `ftl.pubsub.propagation.failed` metric

Success case:
```
ScopeMetrics #0
ScopeMetrics SchemaURL: 
InstrumentationScope ftl.pubsub 

Metric #0
Descriptor:
     -> Name: ftl.pubsub.published
     -> Description: the number of times that an event is published to a topic
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.module.name: Str(echo)
     -> ftl.pubsub.topic.name: Str(echotopic)
     -> ftl.status.succeeded: Bool(true)
StartTimestamp: 2024-07-30 20:42:41.054792 +0000 UTC
Timestamp: 2024-07-30 20:43:01.055944 +0000 UTC
Value: 1

Metric #1
Descriptor:
     -> Name: ftl.pubsub.sink.called
     -> Description: the number of times that a pubsub event has been enqueued to asynchronously send to a subscriber
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.pubsub.sink.module.name: Str(echo)
     -> ftl.pubsub.sink.ref: Str(echo.echoSinkOne)
     -> ftl.pubsub.subscription.module.name: Str(echo)
     -> ftl.pubsub.subscription.ref: Str(echo.sub)
     -> ftl.pubsub.topic.name: Str(echotopic)
StartTimestamp: 2024-07-30 20:42:41.054797 +0000 UTC
Timestamp: 2024-07-30 20:43:01.055965 +0000 UTC
Value: 1
```

Publish error case:
```
Metric #0
Descriptor:
     -> Name: ftl.pubsub.published
     -> Description: the number of times that an event is published to a topic
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.module.name: Str(echo)
     -> ftl.pubsub.publish.error.message: Str(test error)
     -> ftl.pubsub.topic.name: Str(echotopic)
     -> ftl.status.succeeded: Bool(false)
StartTimestamp: 2024-07-30 20:49:19.294421 +0000 UTC
Timestamp: 2024-07-30 20:49:44.294924 +0000 UTC
Value: 1
```

Progressing subscribers error case with subscriber defined:
```
Metric #1
Descriptor:
     -> Name: ftl.pubsub.propagation.failed
     -> Description: the number of times that subscriptions fail to progress
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.pubsub.propagation.failed_operation: Str(BeginConsumingTopicEvent)
     -> ftl.pubsub.sink.module.name: Str(echo)
     -> ftl.pubsub.sink.ref: Str(echo.echoSinkOne)
     -> ftl.pubsub.subscription.module.name: Str(echo)
     -> ftl.pubsub.subscription.ref: Str(echo.sub)
     -> ftl.pubsub.topic.name: Str(echotopic)
StartTimestamp: 2024-07-30 20:46:45.475043 +0000 UTC
Timestamp: 2024-07-30 20:47:00.476169 +0000 UTC
Value: 4
```

Progressing subscribers error case without subscriber:
```
Metric #1
Descriptor:
     -> Name: ftl.pubsub.propagation.failed
     -> Description: the number of times that subscriptions fail to progress
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.pubsub.propagation.failed_operation: Str(GetNextEventForSubscription)
     -> ftl.pubsub.subscription.module.name: Str(echo)
     -> ftl.pubsub.subscription.ref: Str(echo.sub)
     -> ftl.pubsub.topic.name: Str(echotopic)
StartTimestamp: 2024-07-30 19:09:10.084914 +0000 UTC
Timestamp: 2024-07-30 19:09:25.086338 +0000 UTC
Value: 4
```

Issue: #2194

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
deniseli and github-actions[bot] authored Jul 30, 2024
1 parent c4dd5cc commit 606d028
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 23 deletions.
14 changes: 12 additions & 2 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/sql"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
Expand All @@ -21,10 +23,10 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic string, pa
Topic: topic,
Payload: payload,
})
observability.PubSub.Published(ctx, module, topic, err)
if err != nil {
return dalerrs.TranslatePGError(err)
}
observability.PubSub.Published(ctx, module, topic)
return nil
}

Expand Down Expand Up @@ -63,10 +65,12 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
for _, subscription := range subs {
nextCursor, err := tx.db.GetNextEventForSubscription(ctx, eventConsumptionDelay, subscription.Topic, subscription.Cursor)
if err != nil {
observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.None[schema.RefKey]())
return 0, fmt.Errorf("failed to get next cursor: %w", dalerrs.TranslatePGError(err))
}
nextCursorKey, ok := nextCursor.Event.Get()
if !ok {
observability.PubSub.PropagationFailed(ctx, "GetNextEventForSubscription-->Event.Get", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.None[schema.RefKey]())
return 0, fmt.Errorf("could not find event to progress subscription: %w", dalerrs.TranslatePGError(err))
}
if !nextCursor.Ready {
Expand All @@ -82,6 +86,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t

err = tx.db.BeginConsumingTopicEvent(ctx, subscription.Key, nextCursorKey)
if err != nil {
observability.PubSub.PropagationFailed(ctx, "BeginConsumingTopicEvent", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.Some(subscriber.Sink))
return 0, fmt.Errorf("failed to progress subscription: %w", dalerrs.TranslatePGError(err))
}

Expand All @@ -100,15 +105,20 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
MaxBackoff: subscriber.MaxBackoff,
})
if err != nil {
observability.PubSub.PropagationFailed(ctx, "CreateAsyncCall", subscription.Topic.Payload.Name, subscriptionRef(subscription), optional.Some(subscriber.Sink))
return 0, fmt.Errorf("failed to schedule async task for subscription: %w", dalerrs.TranslatePGError(err))
}

observability.PubSub.SubscriberCalled(ctx, subscription.Topic.Payload.Name, schema.RefKey{Module: subscription.Key.Payload.Module, Name: subscription.Name}, subscriber.Sink)
observability.PubSub.SinkCalled(ctx, subscription.Topic.Payload.Name, subscriptionRef(subscription), subscriber.Sink)
successful++
}
return successful, nil
}

func subscriptionRef(subscription sql.GetSubscriptionsNeedingUpdateRow) schema.RefKey {
return schema.RefKey{Module: subscription.Key.Payload.Module, Name: subscription.Name}
}

func (d *DAL) CompleteEventForSubscription(ctx context.Context, module, name string) error {
err := d.db.CompleteEventForSubscription(ctx, name, module)
if err != nil {
Expand Down
81 changes: 61 additions & 20 deletions backend/controller/observability/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"

"github.com/alecthomas/types/optional"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand All @@ -14,17 +15,24 @@ import (
"github.com/TBD54566975/ftl/internal/observability"
)

// To learn more about how sinks and subscriptions work together, check out the
// https://tbd54566975.github.io/ftl/docs/reference/pubsub/
const (
pubsubMeterName = "ftl.pubsub"
pubsubTopicNameAttribute = "ftl.pubsub.topic.name"
pubsubSubscriptionRefAttribute = "ftl.pubsub.subscription.ref"
pubsubSubscriberRefAttribute = "ftl.pubsub.subscriber.sink.ref"
pubsubMeterName = "ftl.pubsub"
pubsubTopicNameAttr = "ftl.pubsub.topic.name"
pubsubSubscriptionRefAttr = "ftl.pubsub.subscription.ref"
pubsubSubscriptionModuleAttr = "ftl.pubsub.subscription.module.name"
pubsubSinkRefAttr = "ftl.pubsub.sink.ref"
pubsubSinkModuleAttr = "ftl.pubsub.sink.module.name"
pubsubFailedOperationAttr = "ftl.pubsub.propagation.failed_operation"
pubsubFailedToPublishErrAttr = "ftl.pubsub.publish.error.message"
)

type PubSubMetrics struct {
meter metric.Meter
published metric.Int64Counter
subscriberCalled metric.Int64Counter
meter metric.Meter
published metric.Int64Counter
propagationFailed metric.Int64Counter
sinkCalled metric.Int64Counter
}

func initPubSubMetrics() (*PubSubMetrics, error) {
Expand All @@ -43,13 +51,22 @@ func initPubSubMetrics() (*PubSubMetrics, error) {
result.published = noop.Int64Counter{}
}

counterName = fmt.Sprintf("%s.subscriber.called", pubsubMeterName)
if result.subscriberCalled, err = result.meter.Int64Counter(
counterName = fmt.Sprintf("%s.propagation.failed", pubsubMeterName)
if result.propagationFailed, err = result.meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that subscriptions fail to progress")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.propagationFailed = noop.Int64Counter{}
}

counterName = fmt.Sprintf("%s.sink.called", pubsubMeterName)
if result.sinkCalled, err = result.meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that a pubsub event has been enqueued to asynchronously send to a subscriber")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.subscriberCalled = noop.Int64Counter{}
result.sinkCalled = noop.Int64Counter{}
}

return result, errs
Expand All @@ -59,18 +76,42 @@ func handleInitCounterError(errs error, err error, counterName string) error {
return errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counterName, err))
}

func (m *PubSubMetrics) Published(ctx context.Context, module, topic string) {
m.published.Add(ctx, 1, metric.WithAttributes(
func (m *PubSubMetrics) Published(ctx context.Context, module, topic string, maybeErr error) {
attrs := []attribute.KeyValue{
attribute.String(observability.ModuleNameAttribute, module),
attribute.String(pubsubTopicNameAttribute, topic),
))
attribute.String(pubsubTopicNameAttr, topic),
attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil),
}

if maybeErr != nil {
attrs = append(attrs, attribute.String(pubsubFailedToPublishErrAttr, maybeErr.Error()))
}

m.published.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func (m *PubSubMetrics) PropagationFailed(ctx context.Context, failedOp, topic string, subscription schema.RefKey, sink optional.Option[schema.RefKey]) {
attrs := []attribute.KeyValue{
attribute.String(pubsubFailedOperationAttr, failedOp),
attribute.String(pubsubTopicNameAttr, topic),
attribute.String(pubsubSubscriptionRefAttr, subscription.String()),
attribute.String(pubsubSubscriptionModuleAttr, subscription.Module),
}

if sinkRef, ok := sink.Get(); ok {
attrs = append(attrs, attribute.String(pubsubSinkRefAttr, sinkRef.String()))
attrs = append(attrs, attribute.String(pubsubSinkModuleAttr, sinkRef.Module))
}

m.propagationFailed.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func (m *PubSubMetrics) SubscriberCalled(ctx context.Context, topic string, subscription, sink schema.RefKey) {
m.subscriberCalled.Add(ctx, 1, metric.WithAttributes(
attribute.String(observability.ModuleNameAttribute, sink.Module),
attribute.String(pubsubTopicNameAttribute, topic),
attribute.String(pubsubSubscriptionRefAttribute, subscription.String()),
attribute.String(pubsubSubscriberRefAttribute, sink.String()),
func (m *PubSubMetrics) SinkCalled(ctx context.Context, topic string, subscription, sink schema.RefKey) {
m.sinkCalled.Add(ctx, 1, metric.WithAttributes(
attribute.String(pubsubTopicNameAttr, topic),
attribute.String(pubsubSubscriptionRefAttr, subscription.String()),
attribute.String(pubsubSubscriptionModuleAttr, subscription.Module),
attribute.String(pubsubSinkRefAttr, sink.String()),
attribute.String(pubsubSinkModuleAttr, sink.Module),
))
}
3 changes: 2 additions & 1 deletion internal/observability/attributes.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package observability

const (
ModuleNameAttribute = "ftl.module.name"
ModuleNameAttribute = "ftl.module.name"
StatusSucceededAttribute = "ftl.status.succeeded"
)

0 comments on commit 606d028

Please sign in to comment.