From 7c1d4c85b8bb80bfe58cfe7df1c3292382b6a82d Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 12 Apr 2024 16:49:25 +1000 Subject: [PATCH] update comments and test --- backend/controller/cronjobs/cronjobs.go | 44 +++++++++++--------- backend/controller/cronjobs/cronjobs_test.go | 9 +++- backend/schema/normalise.go | 6 +-- backend/schema/validate.go | 4 +- backend/schema/validate_test.go | 4 +- 5 files changed, 39 insertions(+), 28 deletions(-) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 08710b8cc5..b9d60df01a 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -147,7 +147,7 @@ func (s *Service) NewCronJobsForModule(ctx context.Context, module *schemapb.Mod } func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context, newDeploymentKey model.DeploymentKey) { - // Rather than finding old/new cronjobs and updating our state, we can just reset the list of jobs + // Rather than finding old/new cron jobs and updating our state, we can just reset the list of jobs _ = s.resetJobsWithNewDeploymentKey(ctx, optional.Some(newDeploymentKey)) } @@ -182,7 +182,7 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { requestBody := map[string]any{} requestJSON, err := json.Marshal(requestBody) if err != nil { - logger.Errorf(err, "could not build cron job body %v:%v", job.DeploymentKey, job.Ref.String()) + logger.Errorf(err, "could not build body for cron job: %v", job.Key) return } @@ -197,7 +197,7 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { defer cancel() _, err = s.call(callCtx, req, optional.Some(requestKey), s.originURL.Host) if err != nil { - logger.Errorf(err, "failed to execute cron job %s", job.Ref.String()) + logger.Errorf(err, "failed to execute cron job %v", job.Key) } schedule, err := cron.Parse(job.Schedule) @@ -207,12 +207,12 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { } next, err := cron.NextAfter(schedule, s.clock.Now().UTC(), false) if err != nil { - logger.Errorf(err, "failed to calculate next execution for cron job %s with schedule %q", job.Ref.String(), job.Schedule) + logger.Errorf(err, "failed to calculate next execution for cron job %v with schedule %q", job.Key, job.Schedule) } updatedJob, err := s.dal.EndCronJob(ctx, job, next) if err != nil { - logger.Errorf(err, "failed to end cronjob %s", job.Ref.String()) + logger.Errorf(err, "failed to end cron job %v", job.Key) } else { s.jobChanges.Publish(jobChange{ changeType: finishedJobs, @@ -221,13 +221,17 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { } } -// killOldJobs looks for jobs that have been executing for too long -// This is the hard timout which happens after the usual timeout plus a grace period for the soft timeout to occur (context's timeout which cancel the call) +// killOldJobs looks for jobs that have been executing for too long. +// A soft timeout should normally occur from the job's context timing out, but there are cases where this does not happen (eg: unresponsive or dead controller) +// In these cases we need a hard timout after an additional grace period. +// To do this, this function resets these job's state to idle and updates the next execution time in the db so the job can be picked up again next time. func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { logger := log.FromContext(ctx) staleJobs, err := s.dal.GetStaleCronJobs(ctx, s.config.Timeout+time.Minute) if err != nil { return 0, err + } else if len(staleJobs) == 0 { + return time.Minute, nil } updatedJobs := []dal.CronJob{} @@ -235,21 +239,21 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { start := s.clock.Now().UTC() pattern, err := cron.Parse(stale.Schedule) if err != nil { - logger.Errorf(err, "Could not kill stale cron job %s because schedule could not be parsed: %q", stale.Ref.String(), stale.Schedule) + logger.Errorf(err, "Could not kill stale cron job %q because schedule could not be parsed: %q", stale.Key, stale.Schedule) continue } next, err := cron.NextAfter(pattern, start, false) if err != nil { - logger.Errorf(err, "Could not kill stale cron job %s because next date could not be calculated: %q", stale.Ref.String(), stale.Schedule) + logger.Errorf(err, "Could not kill stale cron job %q because next date could not be calculated: %q", stale.Key, stale.Schedule) continue } updated, err := s.dal.EndCronJob(ctx, stale, next) if err != nil { - logger.Errorf(err, "Could not kill stale cron job %s because: %v", stale.Ref.String(), err) + logger.Errorf(err, "Could not kill stale cron job %s because: %v", stale.Key, err) continue } - logger.Warnf("Killed stale cron job %s", stale.Ref.String()) + logger.Warnf("Killed stale cron job %s", stale.Key) updatedJobs = append(updatedJobs, updated) } @@ -265,6 +269,8 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { // - the list of known jobs and their state // - executing jobs when they are due // - reacting to events that change the list of jobs, deployments or hash ring +// +// State is private to this function to ensure thread safety. func (s *Service) watchForUpdates(ctx context.Context) { logger := log.FromContext(ctx) @@ -280,7 +286,7 @@ func (s *Service) watchForUpdates(ctx context.Context) { for { sl.SortFunc(state.jobs, func(i, j dal.CronJob) int { - return s.sortJobs(state, i, j) + return s.compareJobs(state, i, j) }) now := s.clock.Now() @@ -294,7 +300,7 @@ func (s *Service) watchForUpdates(ctx context.Context) { if next.Before(state.blockedUntil) { next = state.blockedUntil - logger.Tracef("loop blocked for %vs", next.Sub(now)) + logger.Tracef("loop blocked for %v", next.Sub(now)) } else if next.Sub(now) < time.Second { next = now.Add(time.Second) logger.Tracef("loop while gated for 1s") @@ -337,11 +343,11 @@ func (s *Service) watchForUpdates(ctx context.Context) { removedDeploymentKeys[job.DeploymentKey.String()] = job.DeploymentKey _, err := s.dal.EndCronJob(ctx, job.CronJob, next) if err != nil { - logger.Errorf(err, "failed to end cronjob %s", job.Ref.String()) + logger.Errorf(err, "failed to end cron job %s", job.Key.String()) } continue } - logger.Infof("executing job %s", job.Ref.String()) + logger.Infof("executing job %v", job.Key) state.startedExecutingJob(job.CronJob) go s.executeJob(ctx, job.CronJob) } @@ -366,7 +372,7 @@ func (s *Service) watchForUpdates(ctx context.Context) { } } -func (s *Service) sortJobs(state *State, i, j dal.CronJob) int { +func (s *Service) compareJobs(state *State, i, j dal.CronJob) int { iNext, err := s.nextAttemptForJob(i, state, false) if err != nil { return 1 @@ -384,11 +390,11 @@ func (s *Service) nextAttemptForJob(job dal.CronJob, state *State, allowsNow boo } if job.State == dal.JobStateExecuting { if state.isExecutingInCurrentController(job) { - // return a time in the future, meaning don't schedule at this time + // no need to schedule this job until it finishes return s.clock.Now(), fmt.Errorf("controller is already waiting for job to finish") } - // We don't know when the other controller will finish this job - // We should check again when the next execution date is assuming the job finishes + // We don't know when the other controller that is executing this job will finish it + // So we should optimistically attempt it when the next execution date is due assuming the job finishes pattern, err := cron.Parse(job.Schedule) if err != nil { return s.clock.Now(), fmt.Errorf("failed to parse cron schedule %q", job.Schedule) diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index 424bcdf542..3e1c1b35a1 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -56,7 +56,7 @@ func (d *mockDAL) createCronJob(deploymentKey model.DeploymentKey, module string func (d *mockDAL) indexForJob(job dal.CronJob) (int, error) { for i, j := range d.jobs { - if j.DeploymentKey.String() == job.DeploymentKey.String() && j.Ref.Name == job.Ref.Name { + if j.Key.String() == job.Key.String() { return i, nil } } @@ -92,7 +92,7 @@ func (d *mockDAL) StartCronJobs(ctx context.Context, jobs []dal.CronJob) (attemp HasMinReplicas: true, }) } - d.attemptCountMap[job.Ref.String()]++ + d.attemptCountMap[job.Key.String()]++ } return attemptedJobs, nil } @@ -216,5 +216,10 @@ func TestService(t *testing.T) { for _, j := range mockDal.jobs { count := verbCallCount[j.Ref.Name] assert.Equal(t, count, 3, "expected verb %s to be called 3 times", j.Ref.Name) + + // Make sure each job is not attempted by all controllers, or the responsibility of only one controller + // Target is for 2 controllers to attempt each job + attemptCount := mockDal.attemptCountMap[j.Key.String()] + assert.True(t, attemptCount > 1*count && attemptCount <= 3*attemptCount, "job %v was attempted %d times, expected between > 1 and <= 3 to be attempted", j.Key) } } diff --git a/backend/schema/normalise.go b/backend/schema/normalise.go index 78cb0d2d19..1676e407ac 100644 --- a/backend/schema/normalise.go +++ b/backend/schema/normalise.go @@ -111,9 +111,6 @@ func Normalise[T Node](n T) T { c.Pos = zero c.Path = normaliseSlice(c.Path) - case *MetadataCronJob: - c.Pos = zero - case *MetadataAlias: c.Pos = zero @@ -126,6 +123,9 @@ func Normalise[T Node](n T) T { case *IngressPathParameter: c.Pos = zero + case *MetadataCronJob: + c.Pos = zero + case *Config: c.Pos = zero c.Type = Normalise(c.Type) diff --git a/backend/schema/validate.go b/backend/schema/validate.go index 272047043f..45af0cecae 100644 --- a/backend/schema/validate.go +++ b/backend/schema/validate.go @@ -481,10 +481,10 @@ func validateVerbMetadata(scopes Scopes, n *Verb) (merr []error) { merr = append(merr, err) } if _, ok := n.Request.(*Unit); !ok { - merr = append(merr, errorf(md, "verb %s: cronjob can not have a request type", n.Name)) + merr = append(merr, errorf(md, "verb %s: cron job can not have a request type", n.Name)) } if _, ok := n.Response.(*Unit); !ok { - merr = append(merr, errorf(md, "verb %s: cronjob can not have a response type", n.Name)) + merr = append(merr, errorf(md, "verb %s: cron job can not have a response type", n.Name)) } case *MetadataCalls, *MetadataDatabases, *MetadataAlias: } diff --git a/backend/schema/validate_test.go b/backend/schema/validate_test.go index c4788b1560..a159bf5209 100644 --- a/backend/schema/validate_test.go +++ b/backend/schema/validate_test.go @@ -182,8 +182,8 @@ func TestValidate(t *testing.T) { } `, errs: []string{ - "4:7-7: verb verbWithWrongInput: cronjob can not have a request type", - "6:7-7: verb verbWithWrongOutput: cronjob can not have a response type", + "4:7-7: verb verbWithWrongInput: cron job can not have a request type", + "6:7-7: verb verbWithWrongOutput: cron job can not have a response type", }, }, }