Skip to content

Commit

Permalink
add observability calls, move AsyncCallQueueDepth to async_queries.sql
Browse files Browse the repository at this point in the history
  • Loading branch information
safeer committed Aug 23, 2024
1 parent 25c32ae commit 8b54795
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 21 deletions.
16 changes: 13 additions & 3 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"errors"
"fmt"

"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"

cronsql "github.com/TBD54566975/ftl/backend/controller/cronjobs/sql"
"github.com/TBD54566975/ftl/backend/controller/observability"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/cron"
Expand Down Expand Up @@ -172,12 +174,15 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *Tx, job model.CronJob
}

logger.Tracef("Scheduling cron job %q async_call execution at %s", job.Key, nextAttemptForJob)
refKey := schema.RefKey{Module: job.Verb.Module, Name: job.Verb.Name}
origin := fmt.Sprintf("cron:%s", job.Key)
id, err := tx.db.CreateAsyncCall(ctx, cronsql.CreateAsyncCallParams{
ScheduledAt: nextAttemptForJob,
Verb: schema.RefKey{Module: job.Verb.Module, Name: job.Verb.Name},
Origin: fmt.Sprintf("cron:%s", job.Key),
Verb: refKey,
Origin: origin,
Request: []byte(`{}`),
})
observability.AsyncCalls.Created(ctx, refKey, optional.None[schema.RefKey](), origin, 0, err)
if err != nil {
return fmt.Errorf("failed to create async call for job %q: %w", job.Key, err)
}
Expand All @@ -195,6 +200,11 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *Tx, job model.CronJob
if err != nil {
return fmt.Errorf("failed to update cron job %q: %w", job.Key, err)
}

queueDepth, err := tx.db.AsyncCallQueueDepth(ctx)
if err == nil {
// Don't error out of an FSM transition just over a queue depth retrieval
// error because this is only used for an observability gauge.
observability.AsyncCalls.RecordQueueDepth(ctx, queueDepth)
}
return nil
}
13 changes: 13 additions & 0 deletions backend/controller/cronjobs/sql/async_queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/controller/cronjobs/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions backend/controller/sql/async_queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,8 @@ VALUES (
@trace_context::jsonb
)
RETURNING id;

-- name: AsyncCallQueueDepth :one
SELECT count(*)
FROM async_calls
WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc');
13 changes: 13 additions & 0 deletions backend/controller/sql/async_queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -470,11 +470,6 @@ FROM expired;
-- name: GetLeaseInfo :one
SELECT expires_at, metadata FROM leases WHERE key = @key::lease_key;

-- name: AsyncCallQueueDepth :one
SELECT count(*)
FROM async_calls
WHERE state = 'pending' AND scheduled_at <= (NOW() AT TIME ZONE 'utc');

-- name: AcquireAsyncCall :one
-- Reserve a pending async call for execution, returning the associated lease
-- reservation key and accompanying metadata.
Expand Down
13 changes: 0 additions & 13 deletions backend/controller/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8b54795

Please sign in to comment.