diff --git a/internal/controller/limiter/limiter.go b/internal/controller/limiter/limiter.go index 23a48688..d0ce1e0e 100644 --- a/internal/controller/limiter/limiter.go +++ b/internal/controller/limiter/limiter.go @@ -5,10 +5,8 @@ import ( "fmt" "reflect" - "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" "github.com/buildkite/agent-stack-k8s/v2/internal/controller/model" - "github.com/google/uuid" "go.uber.org/zap" batchv1 "k8s.io/api/batch/v1" "k8s.io/client-go/informers" @@ -59,12 +57,13 @@ func New(logger *zap.Logger, scheduler model.JobHandler, maxInFlight int) *MaxIn func (l *MaxInFlight) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error { informer := factory.Batch().V1().Jobs() jobInformer := informer.Informer() - if _, err := jobInformer.AddEventHandler(l); err != nil { + reg, err := jobInformer.AddEventHandler(l) + if err != nil { return err } go factory.Start(ctx.Done()) - if !cache.WaitForCacheSync(ctx.Done(), jobInformer.HasSynced) { + if !cache.WaitForCacheSync(ctx.Done(), reg.HasSynced) { return fmt.Errorf("failed to sync informer cache") } @@ -98,7 +97,7 @@ func (l *MaxInFlight) Handle(ctx context.Context, job model.Job) error { zap.String("uuid", job.Uuid), ) if err := l.handler.Handle(ctx, job); err != nil { - // Oh well. Return the token and un-record the job. + // Oh well. Return the token. l.tryReturnToken() l.logger.Debug("next handler failed", @@ -111,54 +110,54 @@ func (l *MaxInFlight) Handle(ctx context.Context, job model.Job) error { } // OnAdd is called by k8s to inform us a resource is added. -func (l *MaxInFlight) OnAdd(obj any, _ bool) { +func (l *MaxInFlight) OnAdd(obj any, inInitialList bool) { job, _ := obj.(*batchv1.Job) if job == nil { return } - l.trackJob(job) - l.logger.Debug("at end of OnAdd", zap.Int("tokens-available", len(l.tokenBucket))) -} - -// OnUpdate is called by k8s to inform us a resource is updated. -func (l *MaxInFlight) OnUpdate(_, obj any) { - job, _ := obj.(*batchv1.Job) - if job == nil { + if !inInitialList { + // After sync is finished, the limiter handler takes tokens directly. return } - l.trackJob(job) - l.logger.Debug("at end of OnUpdate", zap.Int("tokens-available", len(l.tokenBucket))) + // Before sync is finished: we're learning about existing jobs, so we should + // (try to) take tokens for unfinished jobs started by a previous controller. + // If it's added as already finished, no need to take a token for it. + // Otherwise, try to take one, but don't block (in case the stack was + // restarted with a different limit). + if !model.JobFinished(job) { + l.tryTakeToken() + l.logger.Debug("existing not-finished job discovered", zap.Int("tokens-available", len(l.tokenBucket))) + } } -// OnDelete is called by k8s to inform us a resource is deleted. -func (l *MaxInFlight) OnDelete(obj any) { - // The job condition at the point of deletion could be non-terminal, but - // it is being deleted, so ignore it and skip to marking complete. - // If buildkite.com/job-uuid label is missing or malformed, don't track it. - job, _ := obj.(*batchv1.Job) - if job == nil { +// OnUpdate is called by k8s to inform us a resource is updated. +func (l *MaxInFlight) OnUpdate(prev, curr any) { + prevState, _ := prev.(*batchv1.Job) + currState, _ := curr.(*batchv1.Job) + if prevState == nil || currState == nil { return } - l.trackJob(job) - if _, err := uuid.Parse(job.Labels[config.UUIDLabel]); err != nil { - return + // Only take or return a token if the job state has *changed*. + // The only valid change is from not-finished to finished. + if !model.JobFinished(prevState) && model.JobFinished(currState) { + l.tryReturnToken() + l.logger.Debug("job state changed from not-finished to finished", zap.Int("tokens-available", len(l.tokenBucket))) } - l.tryReturnToken() - l.logger.Debug("at end of OnDelete", zap.Int("tokens-available", len(l.tokenBucket))) } -// trackJob is called by the k8s informer callbacks to update job state and -// take/return tokens. It does the same thing for all three callbacks. -func (l *MaxInFlight) trackJob(job *batchv1.Job) { - // If buildkite.com/job-uuid label is missing or malformed, don't track it. - if _, err := uuid.Parse(job.Labels[config.UUIDLabel]); err != nil { +// OnDelete is called by k8s to inform us a resource is deleted. +func (l *MaxInFlight) OnDelete(obj any) { + prevState, _ := obj.(*batchv1.Job) + if prevState == nil { return } - if model.JobFinished(job) { + // OnDelete gives us the last-known state prior to deletion. + // If that state was finished, we've already returned a token. + // If that state was not-finished, we need to return a token now. + if !model.JobFinished(prevState) { l.tryReturnToken() - } else { - l.tryTakeToken() + l.logger.Debug("not-finished job was deleted", zap.Int("tokens-available", len(l.tokenBucket))) } } diff --git a/internal/controller/model/fake_scheduler.go b/internal/controller/model/fake_scheduler.go index 4a9a314c..0bef7c51 100644 --- a/internal/controller/model/fake_scheduler.go +++ b/internal/controller/model/fake_scheduler.go @@ -71,14 +71,23 @@ func (f *FakeScheduler) complete(uuid string) { f.Finished = append(f.Finished, uuid) f.mu.Unlock() - f.EventHandler.OnUpdate(nil, &batchv1.Job{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{config.UUIDLabel: uuid}, + f.EventHandler.OnUpdate( + // Previous state + &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{config.UUIDLabel: uuid}, + }, + // No status conditions }, - Status: batchv1.JobStatus{ - Conditions: []batchv1.JobCondition{{Type: batchv1.JobComplete}}, - }, - }) + // New state + &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{config.UUIDLabel: uuid}, + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{{Type: batchv1.JobComplete}}, + }, + }) f.wg.Done() }