Skip to content

Commit

Permalink
add cron job key
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Apr 12, 2024
1 parent 9d26561 commit 7728f85
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 34 deletions.
9 changes: 5 additions & 4 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,13 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod
continue
}
newJobs = append(newJobs, dal.CronJob{
// DeploymentKey: Filled in by DAL,
Key: model.NewCronJobKey(module.Name, verb.Verb.Name),
Ref: schema.Ref{Module: module.Name, Name: verb.Verb.Name},
Schedule: cronStr,
StartTime: start,
NextExecution: next,
State: dal.JobStateIdle,
// DeploymentKey: Filled in by DAL
})
}
}
Expand Down Expand Up @@ -271,8 +272,8 @@ func (s *Service) watchForUpdates(ctx context.Context) {
defer s.jobChanges.Unsubscribe(jobChanges)

state := &State{
executing: map[jobIdentifier]bool{},
newJobs: map[jobIdentifier]time.Time{},
executing: map[string]bool{},
newJobs: map[string]time.Time{},
blockedUntil: s.clock.Now(),
}

Expand Down Expand Up @@ -449,7 +450,7 @@ func (s *Service) isResponsibleForJob(job dal.CronJob, state *State) bool {
return true
}

initialKey, ok := hashringState.hashRing.GetNode(identifierForJob(job).String())
initialKey, ok := hashringState.hashRing.GetNode(job.Key.String())
if !ok {
return true
}
Expand Down
43 changes: 13 additions & 30 deletions backend/controller/cronjobs/state.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package cronjobs

import (
"fmt"
"time"

"github.com/TBD54566975/ftl/backend/controller/dal"
Expand All @@ -10,49 +9,33 @@ import (
"github.com/alecthomas/types/optional"
)

type jobIdentifier struct {
deploymentKey string
verb string
}

func identifierForJob(job dal.CronJob) jobIdentifier {
return jobIdentifier{
deploymentKey: job.DeploymentKey.String(),
verb: job.Ref.Name,
}
}

func (i jobIdentifier) String() string {
return fmt.Sprintf("%s:::%s", i.deploymentKey, i.verb)
}

type State struct {
jobs []dal.CronJob

// Used to determine if this controller is currently executing a job
executing map[jobIdentifier]bool
executing map[string]bool

// Newly created jobs should be attempted by the controller that created them until other controllers
// have a chance to reset their job lists and share responsibilities through the hash ring
newJobs map[jobIdentifier]time.Time
newJobs map[string]time.Time

blockedUntil time.Time
}

func (s *State) isExecutingInCurrentController(job dal.CronJob) bool {
return s.executing[identifierForJob(job)]
return s.executing[job.Key.String()]
}

func (s *State) startedExecutingJob(job dal.CronJob) {
s.executing[identifierForJob(job)] = true
s.executing[job.Key.String()] = true
}

func (s *State) isJobTooNewForHashRing(job dal.CronJob) bool {
if t, ok := s.newJobs[identifierForJob(job)]; ok {
if t, ok := s.newJobs[job.Key.String()]; ok {
if time.Since(t) < newJobHashRingOverrideInterval {
return true
}
delete(s.newJobs, identifierForJob(job))
delete(s.newJobs, job.Key.String())
}
return false
}
Expand All @@ -62,23 +45,23 @@ func (s *State) reset(jobs []dal.CronJob, newDeploymentKey optional.Option[model
copy(s.jobs, jobs)
for _, job := range s.jobs {
if job.State != dal.JobStateExecuting {
delete(s.executing, identifierForJob(job))
delete(s.executing, job.Key.String())
}
if newKey, ok := newDeploymentKey.Get(); ok && job.DeploymentKey.String() == newKey.String() {
// This job is new and should be attempted by the current controller
s.newJobs[identifierForJob(job)] = time.Now()
s.newJobs[job.Key.String()] = time.Now()
}
}
}

func (s *State) updateJobs(jobs []dal.CronJob) {
updatedJobMap := jobMap(jobs)
for idx, old := range s.jobs {
if updated, exists := updatedJobMap[identifierForJob(old)]; exists {
if updated, exists := updatedJobMap[old.Key.String()]; exists {
//TODO: compare to see if outdated
s.jobs[idx] = updated
if updated.State != dal.JobStateExecuting {
delete(s.executing, identifierForJob(updated))
delete(s.executing, updated.Key.String())
}
}
}
Expand All @@ -90,10 +73,10 @@ func (s *State) removeDeploymentKey(key model.DeploymentKey) {
})
}

func jobMap(jobs []dal.CronJob) map[jobIdentifier]dal.CronJob {
m := map[jobIdentifier]dal.CronJob{}
func jobMap(jobs []dal.CronJob) map[string]dal.CronJob {
m := map[string]dal.CronJob{}
for _, job := range jobs {
m[identifierForJob(job)] = job
m[job.Key.String()] = job
}
return m
}

0 comments on commit 7728f85

Please sign in to comment.