From 8b547952d47a794546d23cab4b5f9e50ece4cb48 Mon Sep 17 00:00:00 2001 From: Safeer Jiwan Date: Fri, 23 Aug 2024 09:35:36 -0700 Subject: [PATCH] add observability calls, move AsyncCallQueueDepth to async_queries.sql --- backend/controller/cronjobs/cronjobs.go | 16 +++++++++++++--- .../controller/cronjobs/sql/async_queries.sql.go | 13 +++++++++++++ backend/controller/cronjobs/sql/querier.go | 1 + backend/controller/sql/async_queries.sql | 5 +++++ backend/controller/sql/async_queries.sql.go | 13 +++++++++++++ backend/controller/sql/queries.sql | 5 ----- backend/controller/sql/queries.sql.go | 13 ------------- 7 files changed, 45 insertions(+), 21 deletions(-) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 8eb816765d..085ef27e53 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -6,9 +6,11 @@ import ( "errors" "fmt" + "github.com/alecthomas/types/optional" "github.com/benbjohnson/clock" cronsql "github.com/TBD54566975/ftl/backend/controller/cronjobs/sql" + "github.com/TBD54566975/ftl/backend/controller/observability" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/cron" @@ -172,12 +174,15 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *Tx, job model.CronJob } logger.Tracef("Scheduling cron job %q async_call execution at %s", job.Key, nextAttemptForJob) + refKey := schema.RefKey{Module: job.Verb.Module, Name: job.Verb.Name} + origin := fmt.Sprintf("cron:%s", job.Key) id, err := tx.db.CreateAsyncCall(ctx, cronsql.CreateAsyncCallParams{ ScheduledAt: nextAttemptForJob, - Verb: schema.RefKey{Module: job.Verb.Module, Name: job.Verb.Name}, - Origin: fmt.Sprintf("cron:%s", job.Key), + Verb: refKey, + Origin: origin, Request: []byte(`{}`), }) + observability.AsyncCalls.Created(ctx, refKey, optional.None[schema.RefKey](), origin, 0, err) if err != nil { return fmt.Errorf("failed to create async call for job %q: %w", job.Key, err) } @@ -195,6 +200,11 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *Tx, job model.CronJob if err != nil { return fmt.Errorf("failed to update cron job %q: %w", job.Key, err) } - + queueDepth, err := tx.db.AsyncCallQueueDepth(ctx) + if err == nil { + // Don't error out of an FSM transition just over a queue depth retrieval + // error because this is only used for an observability gauge. + observability.AsyncCalls.RecordQueueDepth(ctx, queueDepth) + } return nil } diff --git a/backend/controller/cronjobs/sql/async_queries.sql.go b/backend/controller/cronjobs/sql/async_queries.sql.go index b67d71cf0b..181199435a 100644 --- a/backend/controller/cronjobs/sql/async_queries.sql.go +++ b/backend/controller/cronjobs/sql/async_queries.sql.go @@ -16,6 +16,19 @@ import ( "github.com/alecthomas/types/optional" ) +const asyncCallQueueDepth = `-- name: AsyncCallQueueDepth :one +SELECT count(*) +FROM async_calls +WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc') +` + +func (q *Queries) AsyncCallQueueDepth(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, asyncCallQueueDepth) + var count int64 + err := row.Scan(&count) + return count, err +} + const createAsyncCall = `-- name: CreateAsyncCall :one INSERT INTO async_calls ( scheduled_at, diff --git a/backend/controller/cronjobs/sql/querier.go b/backend/controller/cronjobs/sql/querier.go index 7f1b44088a..05c77d7f4c 100644 --- a/backend/controller/cronjobs/sql/querier.go +++ b/backend/controller/cronjobs/sql/querier.go @@ -12,6 +12,7 @@ import ( ) type Querier interface { + AsyncCallQueueDepth(ctx context.Context) (int64, error) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) CreateCronJob(ctx context.Context, arg CreateCronJobParams) error GetCronJobByKey(ctx context.Context, key model.CronJobKey) (GetCronJobByKeyRow, error) diff --git a/backend/controller/sql/async_queries.sql b/backend/controller/sql/async_queries.sql index e1c6943871..5f2f3e2000 100644 --- a/backend/controller/sql/async_queries.sql +++ b/backend/controller/sql/async_queries.sql @@ -24,3 +24,8 @@ VALUES ( @trace_context::jsonb ) RETURNING id; + +-- name: AsyncCallQueueDepth :one +SELECT count(*) +FROM async_calls +WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc'); diff --git a/backend/controller/sql/async_queries.sql.go b/backend/controller/sql/async_queries.sql.go index b67d71cf0b..181199435a 100644 --- a/backend/controller/sql/async_queries.sql.go +++ b/backend/controller/sql/async_queries.sql.go @@ -16,6 +16,19 @@ import ( "github.com/alecthomas/types/optional" ) +const asyncCallQueueDepth = `-- name: AsyncCallQueueDepth :one +SELECT count(*) +FROM async_calls +WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc') +` + +func (q *Queries) AsyncCallQueueDepth(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, asyncCallQueueDepth) + var count int64 + err := row.Scan(&count) + return count, err +} + const createAsyncCall = `-- name: CreateAsyncCall :one INSERT INTO async_calls ( scheduled_at, diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index 3b50299e95..b930a8a250 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -470,11 +470,6 @@ FROM expired; -- name: GetLeaseInfo :one SELECT expires_at, metadata FROM leases WHERE key = @key::lease_key; --- name: AsyncCallQueueDepth :one -SELECT count(*) -FROM async_calls -WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc'); - -- name: AcquireAsyncCall :one -- Reserve a pending async call for execution, returning the associated lease -- reservation key and accompanying metadata. diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 088d679adf..dab7ab0350 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -127,19 +127,6 @@ func (q *Queries) AssociateArtefactWithDeployment(ctx context.Context, arg Assoc return err } -const asyncCallQueueDepth = `-- name: AsyncCallQueueDepth :one -SELECT count(*) -FROM async_calls -WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc') -` - -func (q *Queries) AsyncCallQueueDepth(ctx context.Context) (int64, error) { - row := q.db.QueryRowContext(ctx, asyncCallQueueDepth) - var count int64 - err := row.Scan(&count) - return count, err -} - const beginConsumingTopicEvent = `-- name: BeginConsumingTopicEvent :exec WITH event AS ( SELECT id, created_at, key, topic_id, payload, caller, request_key, trace_context