Skip to content

Commit

Permalink
feat: add ftl.async_call.queue_depth and ftl.async_call.created m…
Browse files Browse the repository at this point in the history
…etrics (#2257)

Adds 2 metrics:

`ftl.async_call.queue_depth` is a gauge, so not all recorded values will
appear in the stream. Locally, enqueued async calls complete so quickly
that the gauge gets reset to 0 before the incremented value (1) can
appear in the stream. I confirmed the enqueue `Record()` does work by
temporarily removing the `Record()` call upon completion.
```
Metric #5
Descriptor:
     -> Name: ftl.async_call.queue_depth
     -> Description: number of async calls queued up
     -> Unit: 1
     -> DataType: Gauge

NumberDataPoints #0
StartTimestamp: 2024-08-05 18:13:41.233336 +0000 UTC
Timestamp: 2024-08-05 18:14:01.232978 +0000 UTC
Value: 0
```

`ftl.async_call.created` is a simple counter:
```
Metric #0
Descriptor:
     -> Name: ftl.async_call.created
     -> Description: the number of times that an async call was created
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.async_call.origin: Str(sub:echo.sub)
     -> ftl.async_call.remaining_attempts: Int(0)
     -> ftl.async_call.verb.ref: Str(echo.echoSinkOne)
     -> ftl.module.name: Str(echo)
     -> ftl.status.succeeded: Bool(true)
StartTimestamp: 2024-08-05 18:13:41.233319 +0000 UTC
Timestamp: 2024-08-05 18:14:06.233054 +0000 UTC
Value: 1
```

Issue: #2194
  • Loading branch information
deniseli authored Aug 5, 2024
1 parent 34ea9cf commit 6a67e8f
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 11 deletions.
21 changes: 19 additions & 2 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,11 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
observability.AsyncCalls.Acquired(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, nil)
defer call.Release() //nolint:errcheck

// Queue depth is queried at acquisition time, which means it includes the async
// call that was just executed, but it will be recorded at completion time, so
// decrement the value accordingly.
queueDepth := call.QueueDepth - 1

logger = logger.Scope(fmt.Sprintf("%s:%s", call.Origin, call.Verb))

logger.Tracef("Executing async call")
Expand Down Expand Up @@ -1410,6 +1415,11 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
}
err = s.dal.CompleteAsyncCall(ctx, call, callResult, func(tx *dal.Tx) error {
if failed && call.RemainingAttempts > 0 {
// If the call failed and there are attempts remaining, then
// CompleteAsyncCall would enqueue another call, so increment
// queue depth accordingly.
queueDepth++

// Will retry, do not propagate failure yet.
return nil
}
Expand All @@ -1426,10 +1436,17 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
}
})
if err != nil {
observability.AsyncCalls.Completed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, err)
if failed && call.RemainingAttempts > 0 {
// If the call failed and we had more remaining attempts, then we
// would have incremented queue depth to account for the retry,
// but if CompleteAsyncCall failed, then we failed to enqueue that
// retry, so queue depth needs to be decremented back accordingly.
queueDepth--
}
observability.AsyncCalls.Completed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, queueDepth, err)
return 0, fmt.Errorf("failed to complete async call: %w", err)
}
observability.AsyncCalls.Completed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, nil)
observability.AsyncCalls.Completed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, queueDepth, nil)
go func() {
// Post-commit notification based on origin
switch origin := call.Origin.(type) {
Expand Down
2 changes: 2 additions & 0 deletions backend/controller/dal/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type AsyncCall struct {
Verb schema.RefKey
Request json.RawMessage
ScheduledAt time.Time
QueueDepth int64

RemainingAttempts int32
Backoff time.Duration
Expand Down Expand Up @@ -120,6 +121,7 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error)
Request: decryptedRequest,
Lease: lease,
ScheduledAt: row.ScheduledAt,
QueueDepth: row.QueueDepth,
RemainingAttempts: row.RemainingAttempts,
Backoff: row.Backoff,
MaxBackoff: row.MaxBackoff,
Expand Down
7 changes: 7 additions & 0 deletions backend/controller/dal/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,16 @@ func (d *DAL) StartFSMTransition(ctx context.Context, fsm schema.RefKey, executi
Backoff: retryParams.MinBackoff,
MaxBackoff: retryParams.MaxBackoff,
})
observability.AsyncCalls.Created(ctx, destinationState, origin.String(), int64(retryParams.Count), err)
if err != nil {
return fmt.Errorf("failed to create FSM async call: %w", dalerrs.TranslatePGError(err))
}
queueDepth, err := d.db.AsyncCallQueueDepth(ctx)
if err == nil {
// Don't error out of an FSM transition just over a queue depth retrieval
// error because this is only used for an observability gauge.
observability.AsyncCalls.RecordQueueDepth(ctx, queueDepth)
}

// Start a transition.
instance, err := d.db.StartFSMTransition(ctx, sql.StartFSMTransitionParams{
Expand Down
6 changes: 4 additions & 2 deletions backend/controller/dal/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ func TestSendFSMEvent(t *testing.T) {
FSM: schema.RefKey{Module: "test", Name: "test"},
Key: "invoiceID",
},
Request: []byte(`{}`),
Request: []byte(`{}`),
QueueDepth: 2,
}
assert.Equal(t, expectedCall, call, assert.Exclude[*Lease](), assert.Exclude[time.Time]())

Expand All @@ -55,5 +56,6 @@ func TestSendFSMEvent(t *testing.T) {

actual, err := dal.LoadAsyncCall(ctx, call.ID)
assert.NoError(t, err)
assert.Equal(t, call, actual, assert.Exclude[*Lease](), assert.Exclude[time.Time]())
assert.Equal(t, call, actual, assert.Exclude[*Lease](), assert.Exclude[time.Time](), assert.Exclude[int64]())
assert.Equal(t, call.ID, actual.ID)
}
13 changes: 13 additions & 0 deletions backend/controller/dal/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
Backoff: subscriber.Backoff,
MaxBackoff: subscriber.MaxBackoff,
})
observability.AsyncCalls.Created(ctx, subscriber.Sink, origin.String(), int64(subscriber.RetryAttempts), err)
if err != nil {
observability.PubSub.PropagationFailed(ctx, "CreateAsyncCall", subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), optional.Some(subscriber.Sink))
return 0, fmt.Errorf("failed to schedule async task for subscription: %w", dalerrs.TranslatePGError(err))
Expand All @@ -120,6 +121,18 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t
observability.PubSub.SinkCalled(ctx, subscription.Topic.Payload, nextCursor.Caller, subscriptionRef(subscription), subscriber.Sink)
successful++
}

if successful > 0 {
// If no async calls were successfully created, then there is no need to
// potentially increment the queue depth gauge.
queueDepth, err := tx.db.AsyncCallQueueDepth(ctx)
if err == nil {
// Don't error out of progressing subscriptions just over a queue depth
// retrieval error because this is only used for an observability gauge.
observability.AsyncCalls.RecordQueueDepth(ctx, queueDepth)
}
}

return successful, nil
}

Expand Down
35 changes: 33 additions & 2 deletions backend/controller/observability/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,39 @@ const (
asyncCallOriginAttr = "ftl.async_call.origin"
asyncCallVerbRefAttr = "ftl.async_call.verb.ref"
asyncCallTimeSinceScheduledAtAttr = "ftl.async_call.time_since_scheduled_at_ms"
asyncCallRemainingAttemptsAttr = "ftl.async_call.remaining_attempts"
asyncCallExecFailureModeAttr = "ftl.async_call.execution.failure_mode"
)

type AsyncCallMetrics struct {
created metric.Int64Counter
acquired metric.Int64Counter
executed metric.Int64Counter
completed metric.Int64Counter
msToComplete metric.Int64Histogram
queueDepth metric.Int64Gauge
}

func initAsyncCallMetrics() (*AsyncCallMetrics, error) {
result := &AsyncCallMetrics{
created: noop.Int64Counter{},
acquired: noop.Int64Counter{},
executed: noop.Int64Counter{},
completed: noop.Int64Counter{},
msToComplete: noop.Int64Histogram{},
queueDepth: noop.Int64Gauge{},
}

var err error
meter := otel.Meter(asyncCallMeterName)

signalName := fmt.Sprintf("%s.acquired", asyncCallMeterName)
signalName := fmt.Sprintf("%s.created", asyncCallMeterName)
if result.created, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times that an async call was created")); err != nil {
return nil, wrapErr(signalName, err)
}

signalName = fmt.Sprintf("%s.acquired", asyncCallMeterName)
if result.acquired, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times that the controller tries acquiring an async call")); err != nil {
return nil, wrapErr(signalName, err)
Expand All @@ -65,13 +76,31 @@ func initAsyncCallMetrics() (*AsyncCallMetrics, error) {
return nil, wrapErr(signalName, err)
}

signalName = fmt.Sprintf("%s.queue_depth", asyncCallMeterName)
if result.queueDepth, err = meter.Int64Gauge(signalName, metric.WithUnit("1"),
metric.WithDescription("number of async calls queued up")); err != nil {
return nil, wrapErr(signalName, err)
}

return result, nil
}

func wrapErr(signalName string, err error) error {
return fmt.Errorf("failed to create %q signal: %w", signalName, err)
}

func (m *AsyncCallMetrics) Created(ctx context.Context, verb schema.RefKey, origin string, remainingAttempts int64, maybeErr error) {
attrs := extractRefAttrs(verb, origin)
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil))
attrs = append(attrs, attribute.Int64(asyncCallRemainingAttemptsAttr, remainingAttempts))

m.created.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func (m *AsyncCallMetrics) RecordQueueDepth(ctx context.Context, queueDepth int64) {
m.queueDepth.Record(ctx, queueDepth)
}

func (m *AsyncCallMetrics) Acquired(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) {
attrs := extractAsyncCallAttrs(verb, origin, scheduledAt)
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil))
Expand All @@ -90,7 +119,7 @@ func (m *AsyncCallMetrics) Executed(ctx context.Context, verb schema.RefKey, ori
m.executed.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func (m *AsyncCallMetrics) Completed(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) {
func (m *AsyncCallMetrics) Completed(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, queueDepth int64, maybeErr error) {
msToComplete := timeSinceMS(scheduledAt)

attrs := extractRefAttrs(verb, origin)
Expand All @@ -99,6 +128,8 @@ func (m *AsyncCallMetrics) Completed(ctx context.Context, verb schema.RefKey, or

attrs = append(attrs, attribute.Int64(asyncCallTimeSinceScheduledAtAttr, msToComplete))
m.completed.Add(ctx, 1, metric.WithAttributes(attrs...))

m.queueDepth.Record(ctx, queueDepth)
}

func extractAsyncCallAttrs(verb schema.RefKey, origin string, scheduledAt time.Time) []attribute.KeyValue {
Expand Down
3 changes: 2 additions & 1 deletion backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -458,14 +458,22 @@ INSERT INTO async_calls (verb, origin, request, remaining_attempts, backoff, max
VALUES (@verb, @origin, @request, @remaining_attempts, @backoff::interval, @max_backoff::interval)
RETURNING id;

-- name: AsyncCallQueueDepth :one
SELECT count(*)
FROM async_calls
WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc');

-- name: AcquireAsyncCall :one
-- Reserve a pending async call for execution, returning the associated lease
-- reservation key.
WITH async_call AS (
-- reservation key and accompanying metadata.
WITH pending_calls AS (
SELECT id
FROM async_calls
WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc')
ORDER BY created_at
), async_call AS (
SELECT id
FROM pending_calls
LIMIT 1
FOR UPDATE SKIP LOCKED
), lease AS (
Expand All @@ -481,6 +489,7 @@ RETURNING
id AS async_call_id,
(SELECT idempotency_key FROM lease) AS lease_idempotency_key,
(SELECT key FROM lease) AS lease_key,
(SELECT count(*) FROM pending_calls) AS queue_depth,
origin,
verb,
request,
Expand Down
23 changes: 21 additions & 2 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 6a67e8f

Please sign in to comment.