Skip to content

Commit

Permalink
update comments and test
Browse files Browse the repository at this point in the history
  • Loading branch information
matt2e committed Apr 12, 2024
1 parent 0feaac5 commit 7c1d4c8
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 28 deletions.
44 changes: 25 additions & 19 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod
}

func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context, newDeploymentKey model.DeploymentKey) {
// Rather than finding old/new cronjobs and updating our state, we can just reset the list of jobs
// Rather than finding old/new cron jobs and updating our state, we can just reset the list of jobs
_ = s.resetJobsWithNewDeploymentKey(ctx, optional.Some(newDeploymentKey))
}

Expand Down Expand Up @@ -182,7 +182,7 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) {
requestBody := map[string]any{}
requestJSON, err := json.Marshal(requestBody)
if err != nil {
logger.Errorf(err, "could not build cron job body %v:%v", job.DeploymentKey, job.Ref.String())
logger.Errorf(err, "could not build body for cron job: %v", job.Key)
return
}

Expand All @@ -197,7 +197,7 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) {
defer cancel()
_, err = s.call(callCtx, req, optional.Some(requestKey), s.originURL.Host)
if err != nil {
logger.Errorf(err, "failed to execute cron job %s", job.Ref.String())
logger.Errorf(err, "failed to execute cron job %v", job.Key)
}

schedule, err := cron.Parse(job.Schedule)
Expand All @@ -207,12 +207,12 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) {
}
next, err := cron.NextAfter(schedule, s.clock.Now().UTC(), false)
if err != nil {
logger.Errorf(err, "failed to calculate next execution for cron job %s with schedule %q", job.Ref.String(), job.Schedule)
logger.Errorf(err, "failed to calculate next execution for cron job %v with schedule %q", job.Key, job.Schedule)
}

updatedJob, err := s.dal.EndCronJob(ctx, job, next)
if err != nil {
logger.Errorf(err, "failed to end cronjob %s", job.Ref.String())
logger.Errorf(err, "failed to end cron job %v", job.Key)
} else {
s.jobChanges.Publish(jobChange{
changeType: finishedJobs,
Expand All @@ -221,35 +221,39 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) {
}
}

// killOldJobs looks for jobs that have been executing for too long
// This is the hard timout which happens after the usual timeout plus a grace period for the soft timeout to occur (context's timeout which cancel the call)
// killOldJobs looks for jobs that have been executing for too long.
// A soft timeout should normally occur from the job's context timing out, but there are cases where this does not happen (eg: unresponsive or dead controller)
// In these cases we need a hard timout after an additional grace period.
// To do this, this function resets these job's state to idle and updates the next execution time in the db so the job can be picked up again next time.
func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) {
logger := log.FromContext(ctx)
staleJobs, err := s.dal.GetStaleCronJobs(ctx, s.config.Timeout+time.Minute)
if err != nil {
return 0, err
} else if len(staleJobs) == 0 {
return time.Minute, nil
}

updatedJobs := []dal.CronJob{}
for _, stale := range staleJobs {
start := s.clock.Now().UTC()
pattern, err := cron.Parse(stale.Schedule)
if err != nil {
logger.Errorf(err, "Could not kill stale cron job %s because schedule could not be parsed: %q", stale.Ref.String(), stale.Schedule)
logger.Errorf(err, "Could not kill stale cron job %q because schedule could not be parsed: %q", stale.Key, stale.Schedule)
continue
}
next, err := cron.NextAfter(pattern, start, false)
if err != nil {
logger.Errorf(err, "Could not kill stale cron job %s because next date could not be calculated: %q", stale.Ref.String(), stale.Schedule)
logger.Errorf(err, "Could not kill stale cron job %q because next date could not be calculated: %q", stale.Key, stale.Schedule)
continue
}

updated, err := s.dal.EndCronJob(ctx, stale, next)
if err != nil {
logger.Errorf(err, "Could not kill stale cron job %s because: %v", stale.Ref.String(), err)
logger.Errorf(err, "Could not kill stale cron job %s because: %v", stale.Key, err)
continue
}
logger.Warnf("Killed stale cron job %s", stale.Ref.String())
logger.Warnf("Killed stale cron job %s", stale.Key)
updatedJobs = append(updatedJobs, updated)
}

Expand All @@ -265,6 +269,8 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) {
// - the list of known jobs and their state
// - executing jobs when they are due
// - reacting to events that change the list of jobs, deployments or hash ring
//
// State is private to this function to ensure thread safety.
func (s *Service) watchForUpdates(ctx context.Context) {
logger := log.FromContext(ctx)

Expand All @@ -280,7 +286,7 @@ func (s *Service) watchForUpdates(ctx context.Context) {

for {
sl.SortFunc(state.jobs, func(i, j dal.CronJob) int {
return s.sortJobs(state, i, j)
return s.compareJobs(state, i, j)
})

now := s.clock.Now()
Expand All @@ -294,7 +300,7 @@ func (s *Service) watchForUpdates(ctx context.Context) {

if next.Before(state.blockedUntil) {
next = state.blockedUntil
logger.Tracef("loop blocked for %vs", next.Sub(now))
logger.Tracef("loop blocked for %v", next.Sub(now))
} else if next.Sub(now) < time.Second {
next = now.Add(time.Second)
logger.Tracef("loop while gated for 1s")
Expand Down Expand Up @@ -337,11 +343,11 @@ func (s *Service) watchForUpdates(ctx context.Context) {
removedDeploymentKeys[job.DeploymentKey.String()] = job.DeploymentKey
_, err := s.dal.EndCronJob(ctx, job.CronJob, next)
if err != nil {
logger.Errorf(err, "failed to end cronjob %s", job.Ref.String())
logger.Errorf(err, "failed to end cron job %s", job.Key.String())
}
continue
}
logger.Infof("executing job %s", job.Ref.String())
logger.Infof("executing job %v", job.Key)
state.startedExecutingJob(job.CronJob)
go s.executeJob(ctx, job.CronJob)
}
Expand All @@ -366,7 +372,7 @@ func (s *Service) watchForUpdates(ctx context.Context) {
}
}

func (s *Service) sortJobs(state *State, i, j dal.CronJob) int {
func (s *Service) compareJobs(state *State, i, j dal.CronJob) int {
iNext, err := s.nextAttemptForJob(i, state, false)
if err != nil {
return 1
Expand All @@ -384,11 +390,11 @@ func (s *Service) nextAttemptForJob(job dal.CronJob, state *State, allowsNow boo
}
if job.State == dal.JobStateExecuting {
if state.isExecutingInCurrentController(job) {
// return a time in the future, meaning don't schedule at this time
// no need to schedule this job until it finishes
return s.clock.Now(), fmt.Errorf("controller is already waiting for job to finish")
}
// We don't know when the other controller will finish this job
// We should check again when the next execution date is assuming the job finishes
// We don't know when the other controller that is executing this job will finish it
// So we should optimistically attempt it when the next execution date is due assuming the job finishes
pattern, err := cron.Parse(job.Schedule)
if err != nil {
return s.clock.Now(), fmt.Errorf("failed to parse cron schedule %q", job.Schedule)
Expand Down
9 changes: 7 additions & 2 deletions backend/controller/cronjobs/cronjobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (d *mockDAL) createCronJob(deploymentKey model.DeploymentKey, module string

func (d *mockDAL) indexForJob(job dal.CronJob) (int, error) {
for i, j := range d.jobs {
if j.DeploymentKey.String() == job.DeploymentKey.String() && j.Ref.Name == job.Ref.Name {
if j.Key.String() == job.Key.String() {
return i, nil
}
}
Expand Down Expand Up @@ -92,7 +92,7 @@ func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []dal.CronJob) (attemp
HasMinReplicas: true,
})
}
d.attemptCountMap[job.Ref.String()]++
d.attemptCountMap[job.Key.String()]++
}
return attemptedJobs, nil
}
Expand Down Expand Up @@ -216,5 +216,10 @@ func TestService(t *testing.T) {
for _, j := range mockDal.jobs {
count := verbCallCount[j.Ref.Name]
assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Ref.Name)

// Make sure each job is not attempted by all controllers, or the responsibility of only one controller
// Target is for 2 controllers to attempt each job
attemptCount := mockDal.attemptCountMap[j.Key.String()]
assert.True(t, attemptCount > 1*count && attemptCount <= 3*attemptCount, "job %v was attempted %d times, expected between > 1 and <= 3 to be attempted", j.Key)
}
}
6 changes: 3 additions & 3 deletions backend/schema/normalise.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,6 @@ func Normalise[T Node](n T) T {
c.Pos = zero
c.Path = normaliseSlice(c.Path)

case *MetadataCronJob:
c.Pos = zero

case *MetadataAlias:
c.Pos = zero

Expand All @@ -126,6 +123,9 @@ func Normalise[T Node](n T) T {
case *IngressPathParameter:
c.Pos = zero

case *MetadataCronJob:
c.Pos = zero

case *Config:
c.Pos = zero
c.Type = Normalise(c.Type)
Expand Down
4 changes: 2 additions & 2 deletions backend/schema/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,10 +481,10 @@ func validateVerbMetadata(scopes Scopes, n *Verb) (merr []error) {
merr = append(merr, err)
}
if _, ok := n.Request.(*Unit); !ok {
merr = append(merr, errorf(md, "verb %s: cronjob can not have a request type", n.Name))
merr = append(merr, errorf(md, "verb %s: cron job can not have a request type", n.Name))
}
if _, ok := n.Response.(*Unit); !ok {
merr = append(merr, errorf(md, "verb %s: cronjob can not have a response type", n.Name))
merr = append(merr, errorf(md, "verb %s: cron job can not have a response type", n.Name))
}
case *MetadataCalls, *MetadataDatabases, *MetadataAlias:
}
Expand Down
4 changes: 2 additions & 2 deletions backend/schema/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ func TestValidate(t *testing.T) {
}
`,
errs: []string{
"4:7-7: verb verbWithWrongInput: cronjob can not have a request type",
"6:7-7: verb verbWithWrongOutput: cronjob can not have a response type",
"4:7-7: verb verbWithWrongInput: cron job can not have a request type",
"6:7-7: verb verbWithWrongOutput: cron job can not have a response type",
},
},
}
Expand Down

0 comments on commit 7c1d4c8

Please sign in to comment.