Skip to content

Commit

Permalink
feat: rewrite cron to use async system (#2407)
Browse files Browse the repository at this point in the history
This refactor removes the cron job system's state management, cron
execution, and hashring management, in favor of the async call system.

Data tables changed as follows:
- `cron_jobs` is still used to maintain the job list
- Removes `state`, adds `last_execution::timestampz` and
`last_async_call_id::bigint`
- `async_calls` is inserted to by the cron system
  - `cron_jobs.last_async_call_id = async_calls.id`
  - Adds the notion of a `cron` origin in `async_calls.origin`

After a deployment, all valid unscheduled cron jobs are scheduled; a row
is added to `async_calls` with a `pending` state and `scheduled_at` is
set to the job's next execution time. The corresponding row in
`cron_jobs` is also updated with the scheduled async call, the computed
next execution time, and the inserted async call ID.

On completion of a cron async call, the next execution of that job is
scheduled. Effectively, every cron job will have exactly one scheduled
execution.

Closes #2197

---------

Co-authored-by: Alec Thomas <[email protected]>
  • Loading branch information
safeer and alecthomas authored Aug 23, 2024
1 parent a60bce5 commit 8ce9faf
Show file tree
Hide file tree
Showing 32 changed files with 992 additions and 1,602 deletions.
3 changes: 2 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,5 @@ issues:
- "fmt.Errorf can be replaced with errors.New"
- "fmt.Sprintf can be replaced with string concatenation"
- "strings.Title has been deprecated"
- "error returned from external package is unwrapped.*TranslatePGError"
- "error returned from external package is unwrapped.*TranslatePGError"
- "struct literal uses unkeyed fields"
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ init-db:

# Regenerate SQLC code (requires init-db to be run first)
build-sqlc:
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/{cronjobs}/sql/{db.go,models.go,querier.go,queries.sql.go} internal/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/{cronjobs}/sql/queries.sql internal/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} backend/controller/cronjobs/sql/{db.go,models.go,querier.go,queries.sql.go} internal/configuration/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/async_queries.sql backend/controller/cronjobs/sql/queries.sql internal/configuration/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- "just init-db && sqlc generate"

# Build the ZIP files that are embedded in the FTL release binaries
build-zips: build-kt-runtime
Expand Down
25 changes: 21 additions & 4 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,9 +249,8 @@ func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling
svc.routes.Store(map[string][]dal.Route{})
svc.schema.Store(&schema.Schema{})

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, conn, svc.tasks, svc.callWithRequest)
cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, conn)
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, cronSvc)

pubSub := pubsub.New(ctx, db, svc.tasks, svc)
svc.pubSub = pubSub
Expand Down Expand Up @@ -541,7 +540,10 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re
}
}

s.cronJobs.CreatedOrReplacedDeloyment(ctx, newDeploymentKey)
err = s.cronJobs.CreatedOrReplacedDeloyment(ctx)
if err != nil {
return nil, fmt.Errorf("could not schedule cron jobs: %w", err)
}

return connect.NewResponse(&ftlv1.ReplaceDeployResponse{}), nil
}
Expand Down Expand Up @@ -1403,7 +1405,11 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
logger.Tracef("No async calls to execute")
return time.Second * 2, nil
} else if err != nil {
observability.AsyncCalls.Acquired(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, call.Catching, err)
if call == nil {
observability.AsyncCalls.AcquireFailed(ctx, err)
} else {
observability.AsyncCalls.Acquired(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, call.Catching, err)
}
return 0, err
}

Expand All @@ -1430,6 +1436,9 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration
if returnErr == nil {
// Post-commit notification based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginCron:
break

case dal.AsyncOriginFSM:
break

Expand Down Expand Up @@ -1568,6 +1577,9 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call *

func metadataForAsyncCall(call *dal.AsyncCall) *ftlv1.Metadata {
switch origin := call.Origin.(type) {
case dal.AsyncOriginCron:
return &ftlv1.Metadata{}

case dal.AsyncOriginFSM:
return &ftlv1.Metadata{
Values: []*ftlv1.Metadata_Pair{
Expand Down Expand Up @@ -1595,6 +1607,11 @@ func (s *Service) finaliseAsyncCall(ctx context.Context, tx *dal.Tx, call *dal.A

// Allow for handling of completion based on origin
switch origin := call.Origin.(type) {
case dal.AsyncOriginCron:
if err := s.cronJobs.OnJobCompletion(ctx, origin.CronJobKey, failed); err != nil {
return fmt.Errorf("failed to finalize cron async call: %w", err)
}

case dal.AsyncOriginFSM:
if err := s.onAsyncFSMCallCompletion(ctx, tx, origin, failed, isFinalResult); err != nil {
return fmt.Errorf("failed to finalize FSM async call: %w", err)
Expand Down
Loading

0 comments on commit 8ce9faf

Please sign in to comment.