diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index a5d30e24e9..8eb816765d 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -173,13 +173,10 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *Tx, job model.CronJob logger.Tracef("Scheduling cron job %q async_call execution at %s", job.Key, nextAttemptForJob) id, err := tx.db.CreateAsyncCall(ctx, cronsql.CreateAsyncCallParams{ - ScheduledAt: nextAttemptForJob, - Verb: schema.RefKey{Module: job.Verb.Module, Name: job.Verb.Name}, - Origin: fmt.Sprintf("cron:%s", job.Key), - Request: []byte(`{}`), - RemainingAttempts: 0, - Backoff: 0, - MaxBackoff: 0, + ScheduledAt: nextAttemptForJob, + Verb: schema.RefKey{Module: job.Verb.Module, Name: job.Verb.Name}, + Origin: fmt.Sprintf("cron:%s", job.Key), + Request: []byte(`{}`), }) if err != nil { return fmt.Errorf("failed to create async call for job %q: %w", job.Key, err) diff --git a/backend/controller/cronjobs/sql/async_queries.sql.go b/backend/controller/cronjobs/sql/async_queries.sql.go index 07b6cc3b35..b67d71cf0b 100644 --- a/backend/controller/cronjobs/sql/async_queries.sql.go +++ b/backend/controller/cronjobs/sql/async_queries.sql.go @@ -13,7 +13,6 @@ import ( "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/encryption" - "github.com/TBD54566975/ftl/internal/model" "github.com/alecthomas/types/optional" ) @@ -75,21 +74,3 @@ func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams err := row.Scan(&id) return id, err } - -const isCronJobPending = `-- name: IsCronJobPending :one -SELECT EXISTS ( - SELECT 1 - FROM cron_jobs j - INNER JOIN async_calls ac on j.last_async_call_id = ac.id - WHERE j.key = $1::cron_job_key - AND ac.scheduled_at > $2::TIMESTAMPTZ - AND ac.state = 'pending' -) AS pending -` - -func (q *Queries) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) { - row := q.db.QueryRowContext(ctx, isCronJobPending, key, startTime) - var pending bool - err := row.Scan(&pending) - return pending, err -} diff --git a/backend/controller/cronjobs/sql/queries.sql b/backend/controller/cronjobs/sql/queries.sql index ff58ba35fa..a3a3ec627a 100644 --- a/backend/controller/cronjobs/sql/queries.sql +++ b/backend/controller/cronjobs/sql/queries.sql @@ -38,4 +38,14 @@ UPDATE cron_jobs SET last_async_call_id = sqlc.arg('last_async_call_id')::BIGINT, last_execution = sqlc.arg('last_execution')::TIMESTAMPTZ, next_execution = sqlc.arg('next_execution')::TIMESTAMPTZ - WHERE key = sqlc.arg('key')::cron_job_key; \ No newline at end of file + WHERE key = sqlc.arg('key')::cron_job_key; + +-- name: IsCronJobPending :one +SELECT EXISTS ( + SELECT 1 + FROM cron_jobs j + INNER JOIN async_calls ac on j.last_async_call_id = ac.id + WHERE j.key = sqlc.arg('key')::cron_job_key + AND ac.scheduled_at > sqlc.arg('start_time')::TIMESTAMPTZ + AND ac.state = 'pending' +) AS pending; diff --git a/backend/controller/cronjobs/sql/queries.sql.go b/backend/controller/cronjobs/sql/queries.sql.go index 5aa1e39123..af6e4634b2 100644 --- a/backend/controller/cronjobs/sql/queries.sql.go +++ b/backend/controller/cronjobs/sql/queries.sql.go @@ -149,6 +149,24 @@ func (q *Queries) GetUnscheduledCronJobs(ctx context.Context, startTime time.Tim return items, nil } +const isCronJobPending = `-- name: IsCronJobPending :one +SELECT EXISTS ( + SELECT 1 + FROM cron_jobs j + INNER JOIN async_calls ac on j.last_async_call_id = ac.id + WHERE j.key = $1::cron_job_key + AND ac.scheduled_at > $2::TIMESTAMPTZ + AND ac.state = 'pending' +) AS pending +` + +func (q *Queries) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) { + row := q.db.QueryRowContext(ctx, isCronJobPending, key, startTime) + var pending bool + err := row.Scan(&pending) + return pending, err +} + const updateCronJobExecution = `-- name: UpdateCronJobExecution :exec UPDATE cron_jobs SET last_async_call_id = $1::BIGINT, diff --git a/backend/controller/sql/async_queries.sql b/backend/controller/sql/async_queries.sql index 1f69718a83..e1c6943871 100644 --- a/backend/controller/sql/async_queries.sql +++ b/backend/controller/sql/async_queries.sql @@ -24,13 +24,3 @@ VALUES ( @trace_context::jsonb ) RETURNING id; - --- name: IsCronJobPending :one -SELECT EXISTS ( - SELECT 1 - FROM cron_jobs j - INNER JOIN async_calls ac on j.last_async_call_id = ac.id - WHERE j.key = sqlc.arg('key')::cron_job_key - AND ac.scheduled_at > sqlc.arg('start_time')::TIMESTAMPTZ - AND ac.state = 'pending' -) AS pending; diff --git a/backend/controller/sql/async_queries.sql.go b/backend/controller/sql/async_queries.sql.go index 07b6cc3b35..b67d71cf0b 100644 --- a/backend/controller/sql/async_queries.sql.go +++ b/backend/controller/sql/async_queries.sql.go @@ -13,7 +13,6 @@ import ( "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/encryption" - "github.com/TBD54566975/ftl/internal/model" "github.com/alecthomas/types/optional" ) @@ -75,21 +74,3 @@ func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams err := row.Scan(&id) return id, err } - -const isCronJobPending = `-- name: IsCronJobPending :one -SELECT EXISTS ( - SELECT 1 - FROM cron_jobs j - INNER JOIN async_calls ac on j.last_async_call_id = ac.id - WHERE j.key = $1::cron_job_key - AND ac.scheduled_at > $2::TIMESTAMPTZ - AND ac.state = 'pending' -) AS pending -` - -func (q *Queries) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) { - row := q.db.QueryRowContext(ctx, isCronJobPending, key, startTime) - var pending bool - err := row.Scan(&pending) - return pending, err -} diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 02cca852f6..088d679adf 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -2127,6 +2127,24 @@ func (q *Queries) InsertTimelineLogEvent(ctx context.Context, arg InsertTimeline return err } +const isCronJobPending = `-- name: IsCronJobPending :one +SELECT EXISTS ( + SELECT 1 + FROM cron_jobs j + INNER JOIN async_calls ac on j.last_async_call_id = ac.id + WHERE j.key = $1::cron_job_key + AND ac.scheduled_at > $2::TIMESTAMPTZ + AND ac.state = 'pending' +) AS pending +` + +func (q *Queries) IsCronJobPending(ctx context.Context, key model.CronJobKey, startTime time.Time) (bool, error) { + row := q.db.QueryRowContext(ctx, isCronJobPending, key, startTime) + var pending bool + err := row.Scan(&pending) + return pending, err +} + const killStaleControllers = `-- name: KillStaleControllers :one WITH matches AS ( UPDATE controller