Skip to content

Commit

Permalink
add cron job key
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Apr 12, 2024
1 parent 78d1075 commit b107935
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 81 deletions.
9 changes: 5 additions & 4 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,13 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod
continue
}
newJobs = append(newJobs, dal.CronJob{
// DeploymentKey: Filled in by DAL,
Key: model.NewCronJobKey(module.Name, verb.Verb.Name),
Ref: schema.Ref{Module: module.Name, Name: verb.Verb.Name},
Schedule: cronStr,
StartTime: start,
NextExecution: next,
State: dal.JobStateIdle,
// DeploymentKey: Filled in by DAL
})
}
}
Expand Down Expand Up @@ -271,8 +272,8 @@ func (s *Service) watchForUpdates(ctx context.Context) {
defer s.jobChanges.Unsubscribe(jobChanges)

state := &State{
executing: map[jobIdentifier]bool{},
newJobs: map[jobIdentifier]time.Time{},
executing: map[string]bool{},
newJobs: map[string]time.Time{},
blockedUntil: s.clock.Now(),
}

Expand Down Expand Up @@ -449,7 +450,7 @@ func (s *Service) isResponsibleForJob(job dal.CronJob, state *State) bool {
return true
}

initialKey, ok := hashringState.hashRing.GetNode(identifierForJob(job).String())
initialKey, ok := hashringState.hashRing.GetNode(job.Key.String())
if !ok {
return true
}
Expand Down
43 changes: 13 additions & 30 deletions backend/controller/cronjobs/state.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cronjobs

import (
"fmt"
"time"

"github.com/TBD54566975/ftl/backend/controller/dal"
Expand All @@ -10,49 +9,33 @@ import (
"github.com/alecthomas/types/optional"
)

type jobIdentifier struct {
deploymentKey string
verb string
}

func identifierForJob(job dal.CronJob) jobIdentifier {
return jobIdentifier{
deploymentKey: job.DeploymentKey.String(),
verb: job.Ref.Name,
}
}

func (i jobIdentifier) String() string {
return fmt.Sprintf("%s:::%s", i.deploymentKey, i.verb)
}

type State struct {
jobs []dal.CronJob

// Used to determine if this controller is currently executing a job
executing map[jobIdentifier]bool
executing map[string]bool

// Newly created jobs should be attempted by the controller that created them until other controllers
// have a chance to reset their job lists and share responsibilities through the hash ring
newJobs map[jobIdentifier]time.Time
newJobs map[string]time.Time

blockedUntil time.Time
}

func (s *State) isExecutingInCurrentController(job dal.CronJob) bool {
return s.executing[identifierForJob(job)]
return s.executing[job.Key.String()]
}

func (s *State) startedExecutingJob(job dal.CronJob) {
s.executing[identifierForJob(job)] = true
s.executing[job.Key.String()] = true
}

func (s *State) isJobTooNewForHashRing(job dal.CronJob) bool {
if t, ok := s.newJobs[identifierForJob(job)]; ok {
if t, ok := s.newJobs[job.Key.String()]; ok {
if time.Since(t) < newJobHashRingOverrideInterval {
return true
}
delete(s.newJobs, identifierForJob(job))
delete(s.newJobs, job.Key.String())
}
return false
}
Expand All @@ -62,23 +45,23 @@ func (s *State) reset(jobs []dal.CronJob, newDeploymentKey optional.Option[model
copy(s.jobs, jobs)
for _, job := range s.jobs {
if job.State != dal.JobStateExecuting {
delete(s.executing, identifierForJob(job))
delete(s.executing, job.Key.String())
}
if newKey, ok := newDeploymentKey.Get(); ok && job.DeploymentKey.String() == newKey.String() {
// This job is new and should be attempted by the current controller
s.newJobs[identifierForJob(job)] = time.Now()
s.newJobs[job.Key.String()] = time.Now()
}
}
}

func (s *State) updateJobs(jobs []dal.CronJob) {
updatedJobMap := jobMap(jobs)
for idx, old := range s.jobs {
if updated, exists := updatedJobMap[identifierForJob(old)]; exists {
if updated, exists := updatedJobMap[old.Key.String()]; exists {
//TODO: compare to see if outdated
s.jobs[idx] = updated
if updated.State != dal.JobStateExecuting {
delete(s.executing, identifierForJob(updated))
delete(s.executing, updated.Key.String())
}
}
}
Expand All @@ -90,10 +73,10 @@ func (s *State) removeDeploymentKey(key model.DeploymentKey) {
})
}

func jobMap(jobs []dal.CronJob) map[jobIdentifier]dal.CronJob {
m := map[jobIdentifier]dal.CronJob{}
func jobMap(jobs []dal.CronJob) map[string]dal.CronJob {
m := map[string]dal.CronJob{}
for _, job := range jobs {
m[identifierForJob(job)] = job
m[job.Key.String()] = job
}
return m
}
11 changes: 6 additions & 5 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,7 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
// Start time must be calculated by the caller rather than generated by db
// This ensures that nextExecution is after start time, otherwise the job will never be triggered
err := tx.CreateCronJob(ctx, sql.CreateCronJobParams{
Key: job.Key,
DeploymentKey: deploymentKey,
ModuleName: job.Ref.Module,
Verb: job.Ref.Name,
Expand Down Expand Up @@ -923,7 +924,7 @@ const (
)

type CronJob struct {
id int64
Key model.CronJobKey
DeploymentKey model.DeploymentKey
Ref schema.Ref
Schedule string
Expand All @@ -940,7 +941,7 @@ type AttemptedCronJob struct {

func cronJobFromRow(row sql.GetCronJobsRow) CronJob {
return CronJob{
id: row.ID,
Key: row.Key,
DeploymentKey: row.DeploymentKey,
Ref: schema.Ref{Module: row.Module, Name: row.Verb},
Schedule: row.Schedule,
Expand All @@ -965,7 +966,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs
if len(jobs) == 0 {
return nil, nil
}
rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job CronJob) int64 { return job.id }))
rows, err := d.db.StartCronJobs(ctx, slices.Map(jobs, func(job CronJob) string { return job.Key.String() }))
if err != nil {
return nil, translatePGError(err)
}
Expand All @@ -974,7 +975,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs
for _, row := range rows {
job := AttemptedCronJob{
CronJob: CronJob{
id: row.ID,
Key: row.Key,
DeploymentKey: row.DeploymentKey,
Ref: schema.Ref{Module: row.Module, Name: row.Verb},
Schedule: row.Schedule,
Expand All @@ -991,7 +992,7 @@ func (d *DAL) StartCronJobs(ctx context.Context, jobs []CronJob) (attemptedJobs
}

func (d *DAL) EndCronJob(ctx context.Context, job CronJob, next time.Time) (CronJob, error) {
row, err := d.db.EndCronJob(ctx, next, job.id, job.StartTime)
row, err := d.db.EndCronJob(ctx, next, job.Key, job.StartTime)
if err != nil {
return CronJob{}, translatePGError(err)
}
Expand Down
1 change: 1 addition & 0 deletions backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 13 additions & 11 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -281,14 +281,16 @@ SELECT COUNT(*)
FROM rows;

-- name: GetCronJobs :many
SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
FROM cron_jobs j
INNER JOIN deployments d on j.deployment_id = d.id
WHERE d.min_replicas > 0;

-- name: CreateCronJob :exec
INSERT INTO cron_jobs (deployment_id, module_name, verb, schedule, start_time, next_execution)
VALUES ((SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1),
INSERT INTO cron_jobs (key, deployment_id, module_name, verb, schedule, start_time, next_execution)
VALUES (
sqlc.arg('key')::cron_job_key,
(SELECT id FROM deployments WHERE key = sqlc.arg('deployment_key')::deployment_key LIMIT 1),
sqlc.arg('module_name')::TEXT,
sqlc.arg('verb')::TEXT,
sqlc.arg('schedule')::TEXT,
Expand All @@ -300,39 +302,39 @@ WITH updates AS (
UPDATE cron_jobs
SET state = 'executing',
start_time = (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ
WHERE id = ANY (sqlc.arg('ids'))
WHERE key = ANY (sqlc.arg('keys'))
AND state = 'idle'
AND start_time < next_execution
AND (next_execution AT TIME ZONE 'utc') < (NOW() AT TIME ZONE 'utc')::TIMESTAMPTZ
RETURNING id, state, start_time, next_execution)
SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule,
RETURNING id, key, state, start_time, next_execution)
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule,
COALESCE(u.start_time, j.start_time) as start_time,
COALESCE(u.next_execution, j.next_execution) as next_execution,
COALESCE(u.state, j.state) as state,
d.min_replicas > 0 as has_min_replicas,
CASE WHEN u.id IS NULL THEN FALSE ELSE TRUE END as updated
CASE WHEN u.key IS NULL THEN FALSE ELSE TRUE END as updated
FROM cron_jobs j
INNER JOIN deployments d on j.deployment_id = d.id
LEFT JOIN updates u on j.id = u.id
WHERE j.id = ANY (sqlc.arg('ids'));
WHERE j.key = ANY (sqlc.arg('keys'));

-- name: EndCronJob :one
WITH j AS (
UPDATE cron_jobs
SET state = 'idle',
next_execution = sqlc.arg('next_execution')::TIMESTAMPTZ
WHERE id = sqlc.arg('id')::BIGINT
WHERE key = sqlc.arg('key')::cron_job_key
AND state = 'executing'
AND start_time = sqlc.arg('start_time')::TIMESTAMPTZ
RETURNING *
)
SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
FROM j
INNER JOIN deployments d on j.deployment_id = d.id
LIMIT 1;

-- name: GetStaleCronJobs :many
SELECT j.id as id, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
SELECT j.key as key, d.key as deployment_key, j.module_name as module, j.verb, j.schedule, j.start_time, j.next_execution, j.state
FROM cron_jobs j
INNER JOIN deployments d on j.deployment_id = d.id
WHERE state = 'executing'
Expand Down
Loading

0 comments on commit b107935

Please sign in to comment.