From 0feaac52901b3d7b9812cfec4809e61dc36a33c8 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 12 Apr 2024 16:20:40 +1000 Subject: [PATCH] update comments --- backend/controller/cronjobs/cronjobs.go | 7 ++++++- backend/controller/cronjobs/state.go | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 2ecf7df1c3..08710b8cc5 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -151,6 +151,7 @@ func (s *Service) CreatedOrReplacedDeloyment(ctx context.Context, newDeploymentK _ = s.resetJobsWithNewDeploymentKey(ctx, optional.Some(newDeploymentKey)) } +// resetJobs is run periodically via a scheduled task func (s *Service) resetJobs(ctx context.Context) (time.Duration, error) { err := s.resetJobsWithNewDeploymentKey(ctx, optional.None[model.DeploymentKey]()) if err != nil { @@ -220,6 +221,8 @@ func (s *Service) executeJob(ctx context.Context, job dal.CronJob) { } } +// killOldJobs looks for jobs that have been executing for too long +// This is the hard timout which happens after the usual timeout plus a grace period for the soft timeout to occur (context's timeout which cancel the call) func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { logger := log.FromContext(ctx) staleJobs, err := s.dal.GetStaleCronJobs(ctx, s.config.Timeout+time.Minute) @@ -259,7 +262,7 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { } // watchForUpdates is the centralized place that handles: -// - list of known jobs and their state +// - the list of known jobs and their state // - executing jobs when they are due // - reacting to events that change the list of jobs, deployments or hash ring func (s *Service) watchForUpdates(ctx context.Context) { @@ -439,6 +442,8 @@ func (s *Service) UpdatedControllerList(ctx context.Context, controllers []dal.C }) } +// isResponsibleForJob indicates whether a this service should be responsible for attempting jobs, +// or if enough other controllers will handle it. This allows us to spread the job load across controllers. func (s *Service) isResponsibleForJob(job dal.CronJob, state *State) bool { if state.isJobTooNewForHashRing(job) { return true diff --git a/backend/controller/cronjobs/state.go b/backend/controller/cronjobs/state.go index 080c655360..757486a436 100644 --- a/backend/controller/cronjobs/state.go +++ b/backend/controller/cronjobs/state.go @@ -19,6 +19,7 @@ type State struct { // have a chance to reset their job lists and share responsibilities through the hash ring newJobs map[string]time.Time + // We delay any job attempts in case of db errors to avoid hammering the db in a tight loop blockedUntil time.Time } @@ -58,7 +59,6 @@ func (s *State) updateJobs(jobs []dal.CronJob) { updatedJobMap := jobMap(jobs) for idx, old := range s.jobs { if updated, exists := updatedJobMap[old.Key.String()]; exists { - //TODO: compare to see if outdated s.jobs[idx] = updated if updated.State != dal.JobStateExecuting { delete(s.executing, updated.Key.String())