Skip to content

Commit

Permalink
use cron_jobs.last_async_call_id instead of async_calls.cron_job_key
Browse files Browse the repository at this point in the history
  • Loading branch information
safeer committed Aug 22, 2024
1 parent 85a1bf5 commit 7810fc8
Show file tree
Hide file tree
Showing 14 changed files with 139 additions and 121 deletions.
11 changes: 7 additions & 4 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"

"github.com/alecthomas/types/optional"
"github.com/benbjohnson/clock"

cronsql "github.com/TBD54566975/ftl/backend/controller/cronjobs/sql"
Expand Down Expand Up @@ -173,15 +172,14 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *Tx, job model.CronJob
}

logger.Tracef("Scheduling cron job %q async_call execution at %s", job.Key, nextAttemptForJob)
_, err = tx.db.CreateAsyncCall(ctx, cronsql.CreateAsyncCallParams{
id, err := tx.db.CreateAsyncCall(ctx, cronsql.CreateAsyncCallParams{
ScheduledAt: nextAttemptForJob,
Verb: schema.RefKey{Module: job.Verb.Module, Name: job.Verb.Name},
Origin: fmt.Sprintf("cron:%s", job.Key),
Request: []byte(`{}`),
RemainingAttempts: 0,
Backoff: 0,
MaxBackoff: 0,
CronJobKey: optional.Some(job.Key),
})
if err != nil {
return fmt.Errorf("failed to create async call for job %q: %w", job.Key, err)
Expand All @@ -191,7 +189,12 @@ func (s *Service) scheduleCronJob(ctx context.Context, tx *Tx, job model.CronJob
return fmt.Errorf("failed to calculate future execution for cron job %q with schedule %q: %w", job.Key, job.Schedule, err)
}
logger.Tracef("Updating cron job %q with last attempt at %s and next attempt at %s", job.Key, nextAttemptForJob, futureAttemptForJob)
err = tx.db.UpdateCronJobExecution(ctx, nextAttemptForJob, futureAttemptForJob, job.Key)
err = tx.db.UpdateCronJobExecution(ctx, cronsql.UpdateCronJobExecutionParams{
LastAsyncCallID: id,
LastExecution: nextAttemptForJob,
NextExecution: futureAttemptForJob,
Key: job.Key,
})
if err != nil {
return fmt.Errorf("failed to update cron job %q: %w", job.Key, err)
}
Expand Down
16 changes: 7 additions & 9 deletions backend/controller/cronjobs/sql/async_queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions backend/controller/cronjobs/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/controller/cronjobs/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 5 additions & 8 deletions backend/controller/cronjobs/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@ FROM cron_jobs j
WHERE d.min_replicas > 0
AND j.start_time < sqlc.arg('start_time')::TIMESTAMPTZ
AND (
j.last_execution IS NULL
j.last_async_call_id IS NULL
OR NOT EXISTS (
SELECT 1
FROM async_calls ac
WHERE
ac.cron_job_key = j.key
AND (
ac.scheduled_at > j.last_execution OR
(ac.scheduled_at = j.last_execution AND ac.state IN ('pending', 'executing'))
)
WHERE ac.id = j.last_async_call_id
AND ac.state IN ('pending', 'executing')
)
)
FOR UPDATE SKIP LOCKED;
Expand All @@ -39,6 +35,7 @@ INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_ti

-- name: UpdateCronJobExecution :exec
UPDATE cron_jobs
SET last_execution = sqlc.arg('last_execution')::TIMESTAMPTZ,
SET last_async_call_id = sqlc.arg('last_async_call_id')::BIGINT,
last_execution = sqlc.arg('last_execution')::TIMESTAMPTZ,
next_execution = sqlc.arg('next_execution')::TIMESTAMPTZ
WHERE key = sqlc.arg('key')::cron_job_key;
39 changes: 25 additions & 14 deletions backend/controller/cronjobs/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions backend/controller/sql/async_queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ INSERT INTO async_calls (
max_backoff,
catch_verb,
parent_request_key,
trace_context,
cron_job_key
trace_context
)
VALUES (
@scheduled_at::TIMESTAMPTZ,
Expand All @@ -22,16 +21,16 @@ VALUES (
@max_backoff::interval,
@catch_verb,
@parent_request_key,
@trace_context::jsonb,
@cron_job_key
@trace_context::jsonb
)
RETURNING id;

-- name: IsCronJobPending :one
SELECT EXISTS (
SELECT 1
FROM async_calls ac
WHERE ac.cron_job_key = sqlc.arg('key')::cron_job_key
FROM cron_jobs j
INNER JOIN async_calls ac on j.last_async_call_id = ac.id
WHERE j.key = sqlc.arg('key')::cron_job_key
AND ac.scheduled_at > sqlc.arg('start_time')::TIMESTAMPTZ
AND ac.state = 'pending'
) AS pending;
16 changes: 7 additions & 9 deletions backend/controller/sql/async_queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7810fc8

Please sign in to comment.