Skip to content

Commit

Permalink
Merge pull request #432 from buildkite/fix-limiter-token-tracking-again
Browse files Browse the repository at this point in the history
Fix limiter token tracking (again)
  • Loading branch information
DrJosh9000 authored Nov 24, 2024
2 parents 0fa2554 + 878ddf3 commit eb355cc
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 43 deletions.
71 changes: 35 additions & 36 deletions internal/controller/limiter/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import (
"fmt"
"reflect"

"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/model"

"github.com/google/uuid"
"go.uber.org/zap"
batchv1 "k8s.io/api/batch/v1"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -59,12 +57,13 @@ func New(logger *zap.Logger, scheduler model.JobHandler, maxInFlight int) *MaxIn
func (l *MaxInFlight) RegisterInformer(ctx context.Context, factory informers.SharedInformerFactory) error {
informer := factory.Batch().V1().Jobs()
jobInformer := informer.Informer()
if _, err := jobInformer.AddEventHandler(l); err != nil {
reg, err := jobInformer.AddEventHandler(l)
if err != nil {
return err
}
go factory.Start(ctx.Done())

if !cache.WaitForCacheSync(ctx.Done(), jobInformer.HasSynced) {
if !cache.WaitForCacheSync(ctx.Done(), reg.HasSynced) {
return fmt.Errorf("failed to sync informer cache")
}

Expand Down Expand Up @@ -98,7 +97,7 @@ func (l *MaxInFlight) Handle(ctx context.Context, job model.Job) error {
zap.String("uuid", job.Uuid),
)
if err := l.handler.Handle(ctx, job); err != nil {
// Oh well. Return the token and un-record the job.
// Oh well. Return the token.
l.tryReturnToken()

l.logger.Debug("next handler failed",
Expand All @@ -111,54 +110,54 @@ func (l *MaxInFlight) Handle(ctx context.Context, job model.Job) error {
}

// OnAdd is called by k8s to inform us a resource is added.
func (l *MaxInFlight) OnAdd(obj any, _ bool) {
func (l *MaxInFlight) OnAdd(obj any, inInitialList bool) {
job, _ := obj.(*batchv1.Job)
if job == nil {
return
}
l.trackJob(job)
l.logger.Debug("at end of OnAdd", zap.Int("tokens-available", len(l.tokenBucket)))
}

// OnUpdate is called by k8s to inform us a resource is updated.
func (l *MaxInFlight) OnUpdate(_, obj any) {
job, _ := obj.(*batchv1.Job)
if job == nil {
if !inInitialList {
// After sync is finished, the limiter handler takes tokens directly.
return
}
l.trackJob(job)
l.logger.Debug("at end of OnUpdate", zap.Int("tokens-available", len(l.tokenBucket)))
// Before sync is finished: we're learning about existing jobs, so we should
// (try to) take tokens for unfinished jobs started by a previous controller.
// If it's added as already finished, no need to take a token for it.
// Otherwise, try to take one, but don't block (in case the stack was
// restarted with a different limit).
if !model.JobFinished(job) {
l.tryTakeToken()
l.logger.Debug("existing not-finished job discovered", zap.Int("tokens-available", len(l.tokenBucket)))
}
}

// OnDelete is called by k8s to inform us a resource is deleted.
func (l *MaxInFlight) OnDelete(obj any) {
// The job condition at the point of deletion could be non-terminal, but
// it is being deleted, so ignore it and skip to marking complete.
// If buildkite.com/job-uuid label is missing or malformed, don't track it.
job, _ := obj.(*batchv1.Job)
if job == nil {
// OnUpdate is called by k8s to inform us a resource is updated.
func (l *MaxInFlight) OnUpdate(prev, curr any) {
prevState, _ := prev.(*batchv1.Job)
currState, _ := curr.(*batchv1.Job)
if prevState == nil || currState == nil {
return
}
l.trackJob(job)
if _, err := uuid.Parse(job.Labels[config.UUIDLabel]); err != nil {
return
// Only take or return a token if the job state has *changed*.
// The only valid change is from not-finished to finished.
if !model.JobFinished(prevState) && model.JobFinished(currState) {
l.tryReturnToken()
l.logger.Debug("job state changed from not-finished to finished", zap.Int("tokens-available", len(l.tokenBucket)))
}
l.tryReturnToken()
l.logger.Debug("at end of OnDelete", zap.Int("tokens-available", len(l.tokenBucket)))
}

// trackJob is called by the k8s informer callbacks to update job state and
// take/return tokens. It does the same thing for all three callbacks.
func (l *MaxInFlight) trackJob(job *batchv1.Job) {
// If buildkite.com/job-uuid label is missing or malformed, don't track it.
if _, err := uuid.Parse(job.Labels[config.UUIDLabel]); err != nil {
// OnDelete is called by k8s to inform us a resource is deleted.
func (l *MaxInFlight) OnDelete(obj any) {
prevState, _ := obj.(*batchv1.Job)
if prevState == nil {
return
}

if model.JobFinished(job) {
// OnDelete gives us the last-known state prior to deletion.
// If that state was finished, we've already returned a token.
// If that state was not-finished, we need to return a token now.
if !model.JobFinished(prevState) {
l.tryReturnToken()
} else {
l.tryTakeToken()
l.logger.Debug("not-finished job was deleted", zap.Int("tokens-available", len(l.tokenBucket)))
}
}

Expand Down
23 changes: 16 additions & 7 deletions internal/controller/model/fake_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,23 @@ func (f *FakeScheduler) complete(uuid string) {
f.Finished = append(f.Finished, uuid)
f.mu.Unlock()

f.EventHandler.OnUpdate(nil, &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{config.UUIDLabel: uuid},
f.EventHandler.OnUpdate(
// Previous state
&batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{config.UUIDLabel: uuid},
},
// No status conditions
},
Status: batchv1.JobStatus{
Conditions: []batchv1.JobCondition{{Type: batchv1.JobComplete}},
},
})
// New state
&batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{config.UUIDLabel: uuid},
},
Status: batchv1.JobStatus{
Conditions: []batchv1.JobCondition{{Type: batchv1.JobComplete}},
},
})
f.wg.Done()
}

Expand Down

0 comments on commit eb355cc

Please sign in to comment.