Skip to content

Commit

Permalink
Merge pull request #431 from buildkite/pod-watcher-match-tags
Browse files Browse the repository at this point in the history
Match tags in pod watcher
  • Loading branch information
DrJosh9000 authored Nov 21, 2024
2 parents 4df2c6e + 961c57d commit 0fa2554
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 113 deletions.
4 changes: 1 addition & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/buildkite/agent-stack-k8s/v2

go 1.22.6

toolchain go1.22.7
go 1.23.3

require (
github.com/Khan/genqlient v0.7.0
Expand Down
67 changes: 48 additions & 19 deletions internal/controller/agenttags/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,35 @@ package agenttags
import (
"errors"
"fmt"
"iter"
"strings"

"k8s.io/apimachinery/pkg/util/validation"
)

// ToMap converts a slice of strings of the form `k=v` to a map where the
// TagMapFromTags converts a slice of strings of the form `k=v` to a map where the
// key is `k` and the value is `v`. If any element of the slice does not
// have that form, it will not be inserted into the map and instead generate
// an error which will be appended to the second return value.
func ToMap(tags []string) (map[string]string, []error) {
m := map[string]string{}
errs := []error{}
func TagMapFromTags(tags []string) (map[string]string, []error) {
m := make(map[string]string, len(tags))
var errs []error
for _, tag := range tags {
parts := strings.SplitN(tag, "=", 2)
if len(parts) != 2 {
k, v, has := strings.Cut(tag, "=")
if !has {
errs = append(errs, fmt.Errorf("invalid agent tag: %q", tag))
continue
}
m[parts[0]] = parts[1]
m[k] = v
}
return m, errs
}

func mapToLabels(m map[string]string) (map[string]string, []error) {
labels := map[string]string{}
errs := []error{}
// labelsFromTagMap converts map[key->value] to map[tag.buildkite.com/key->value],
// with k8s compatibility checks
func labelsFromTagMap(m map[string]string) (map[string]string, []error) {
labels := make(map[string]string, len(m))
var errs []error
for k, v := range m {
namespacedKey := "tag.buildkite.com/" + k
if errMsgs := validation.IsQualifiedName(namespacedKey); len(errMsgs) > 0 {
Expand All @@ -50,33 +53,59 @@ func mapToLabels(m map[string]string) (map[string]string, []error) {
return labels, errs
}

// ToLabels converts a slice of strings of the form `k=v` to a map where the
// LabelsFromTags converts a slice of strings of the form `k=v` to a map where the
// key is `k` and the value is `v`. If any element of the slice does not
// have that form or if `k` is not a valid kubernetes label name or if `v`
// is not a valid kubernetes label value, it will not be inserted into the
// map and instead generate an error which will be appended to the second
// return value.
//
// See https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
func ToLabels(tags []string) (map[string]string, []error) {
m, errs1 := ToMap(tags)
labels, errs2 := mapToLabels(m)
func LabelsFromTags(tags []string) (map[string]string, []error) {
m, errs1 := TagMapFromTags(tags)
labels, errs2 := labelsFromTagMap(m)
return labels, append(errs1, errs2...)
}

// JobTagsMatchAgentTags returns true if and only if, for each tag key in
// `jobTags`: either the tag key is also present in `agentTags`, and the tag
// value in `jobTags` is "*" or the same as the tag value in `agentTags`
// JobTagsMatchAgentTags reports whether each tag key in `jobTags` is also
// present in `agentTags`, and the tag value in `jobTags` is either "*" or the
// same as the tag value in `agentTags`.
//
// In the future, this may be expanded to: if the tag value `agentTags` is in some
// set of strings defined by the tag value in `jobTags` (eg a glob or regex)
// See https://buildkite.com/docs/agent/v3/cli-start#agent-targeting
func JobTagsMatchAgentTags(jobTags, agentTags map[string]string) bool {
func JobTagsMatchAgentTags(jobTags iter.Seq2[string, string], agentTags map[string]string) bool {
for k, v := range jobTags {
agentTagValue, exists := agentTags[k]
if !exists || (v != "*" && v != agentTagValue) {
if !exists {
return false
}
if v != "*" && v != agentTagValue {
return false
}
}
return true
}

// ScanLabels returns an iterator over all labels that are tags.
func ScanLabels(labels map[string]string) iter.Seq2[string, string] {
return func(yield func(string, string) bool) {
for key, value := range labels {
k, has := strings.CutPrefix(key, "tag.buildkite.com/")
if !has {
continue
}
if !yield(k, value) {
return
}
}
}
}

// TagsFromLabels converts job or pod labels into a slice of agent/job tags.
func TagsFromLabels(labels map[string]string) (tags []string) {
for key, value := range ScanLabels(labels) {
tags = append(tags, key+"="+value)
}
return tags
}
23 changes: 15 additions & 8 deletions internal/controller/agenttags/tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package agenttags_test
import (
"errors"
"fmt"
"maps"
"testing"

"github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags"
"github.com/stretchr/testify/assert"
)

func TestToMap(t *testing.T) {
func TestMapFromTags(t *testing.T) {
t.Parallel()

for i, test := range []struct {
Expand Down Expand Up @@ -63,7 +64,7 @@ func TestToMap(t *testing.T) {
test := test
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
t.Parallel()
m, errs := agenttags.ToMap(test.agentTags)
m, errs := agenttags.TagMapFromTags(test.agentTags)
if test.expectedErrs != nil {
assert.Equal(t, test.expectedErrs, errs)
}
Expand All @@ -73,56 +74,62 @@ func TestToMap(t *testing.T) {

}

func TestToLabels(t *testing.T) {
func TestLabelsFromTags(t *testing.T) {
t.Parallel()

const invalidLabelErrMsg = "a valid label must be an empty string or consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character (e.g. 'MyValue', or 'my_value', or '12345', regex used for validation is '(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?')"

for i, test := range []struct {
for _, test := range []struct {
name string
agentTags []string
expectedLabels map[string]string
expectedErrs []error
}{
{
name: "empty tags",
agentTags: []string{},
expectedLabels: map[string]string{},
},
{
name: "valid queue",
agentTags: []string{"queue=kubernetes"},
expectedLabels: map[string]string{
"tag.buildkite.com/queue": "kubernetes",
},
},
{
name: "valid queue and arch",
agentTags: []string{"queue=kubernetes", "arch=arm64"},
expectedLabels: map[string]string{
"tag.buildkite.com/queue": "kubernetes",
"tag.buildkite.com/arch": "arm64",
},
},
{
name: "valid queue and arch (swapped order)",
agentTags: []string{"arch=arm64", "queue=kubernetes"},
expectedLabels: map[string]string{
"tag.buildkite.com/queue": "kubernetes",
"tag.buildkite.com/arch": "arm64",
},
},
{
name: "k8s rejects value",
agentTags: []string{"queue=kubernetes=2"},
expectedLabels: map[string]string{},
expectedErrs: []error{errors.New(invalidLabelErrMsg)},
},
{
name: "empty value",
agentTags: []string{"queue="},
expectedLabels: map[string]string{
"tag.buildkite.com/queue": "",
},
},
} {
test := test
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
labels, errs := agenttags.ToLabels(test.agentTags)
labels, errs := agenttags.LabelsFromTags(test.agentTags)
if test.expectedErrs != nil {
assert.Equal(t, test.expectedErrs, errs)
}
Expand Down Expand Up @@ -199,7 +206,7 @@ func TestJobTagsMatchAgentTags(t *testing.T) {

t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
t.Parallel()
actualResult := agenttags.JobTagsMatchAgentTags(test.jobTags, test.agentTags)
actualResult := agenttags.JobTagsMatchAgentTags(maps.All(test.jobTags), test.agentTags)
assert.Equal(
t,
test.expectedResult,
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func NewInformerFactory(
namespace string,
tags []string,
) (informers.SharedInformerFactory, error) {
labelsFromTags, errs := agenttags.ToLabels(tags)
labelsFromTags, errs := agenttags.LabelsFromTags(tags)
if len(errs) != 0 {
return nil, errors.Join(errs...)
}
Expand Down
21 changes: 10 additions & 11 deletions internal/controller/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"maps"
"math/rand/v2"
"reflect"
"sync"
Expand Down Expand Up @@ -117,19 +118,14 @@ func (m *Monitor) getScheduledCommandJobs(ctx context.Context, queue string) (jo
return clusteredJobResp(*resp), err
}

func toMapAndLogErrors(logger *zap.Logger, tags []string) map[string]string {
agentTags, tagErrs := agenttags.ToMap(tags)
if len(tagErrs) != 0 {
logger.Warn("making a map of agent tags", zap.Errors("err", tagErrs))
}
return agentTags
}

func (m *Monitor) Start(ctx context.Context, handler model.JobHandler) <-chan error {
logger := m.logger.With(zap.String("org", m.cfg.Org))
errs := make(chan error, 1)

agentTags := toMapAndLogErrors(logger, m.cfg.Tags)
agentTags, tagErrs := agenttags.TagMapFromTags(m.cfg.Tags)
if len(tagErrs) != 0 {
logger.Warn("making a map of agent tags", zap.Errors("err", tagErrs))
}

var queue string
var ok bool
Expand Down Expand Up @@ -242,11 +238,14 @@ func jobHandlerWorker(ctx, staleCtx context.Context, logger *zap.Logger, handler
if j == nil {
return
}
jobTags := toMapAndLogErrors(logger, j.AgentQueryRules)
jobTags, tagErrs := agenttags.TagMapFromTags(j.AgentQueryRules)
if len(tagErrs) != 0 {
logger.Warn("making a map of job tags", zap.Errors("err", tagErrs))
}

// The api returns jobs that match ANY agent tags (the agent query rules)
// However, we can only acquire jobs that match ALL agent tags
if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) {
if !agenttags.JobTagsMatchAgentTags(maps.All(jobTags), agentTags) {
logger.Debug("skipping job because it did not match all tags", zap.Any("job", j))
continue
}
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/scheduler/fail_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func failJob(
}, options...)

// queue is required for acquire! maybe more
ctr, err := agentcore.NewController(ctx, agentToken, kjobName(jobUUID), tags, opts...)
ctr, err := agentcore.NewController(ctx, agentToken, k8sJobName(jobUUID), tags, opts...)
if err != nil {
zapLogger.Error("registering or connecting ephemeral agent", zap.Error(err))
return fmt.Errorf("registering or connecting ephemeral agent: %w", err)
Expand Down
27 changes: 18 additions & 9 deletions internal/controller/scheduler/pod_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/buildkite/agent-stack-k8s/v2/api"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags"
"github.com/buildkite/agent-stack-k8s/v2/internal/controller/config"

agentcore "github.com/buildkite/agent/v3/core"
Expand Down Expand Up @@ -54,6 +55,8 @@ type podWatcher struct {
// library outside of our control is a carve-out from the usual rule.)
// The context is needed to ensure job cancel checkers are cleaned up.
resourceEventHandlerCtx context.Context

agentTags map[string]string
}

// NewPodWatcher creates an informer that does various things with pods and
Expand All @@ -75,6 +78,12 @@ func NewPodWatcher(logger *zap.Logger, k8s kubernetes.Interface, cfg *config.Con
if jobCancelCheckerInterval <= 0 {
jobCancelCheckerInterval = config.DefaultJobCancelCheckerPollInterval
}

agentTags, errs := agenttags.TagMapFromTags(cfg.Tags)
if len(errs) > 0 {
logger.Warn("parsing agent tags", zap.Errors("errors", errs))
}

return &podWatcher{
logger: logger,
k8s: k8s,
Expand All @@ -84,6 +93,7 @@ func NewPodWatcher(logger *zap.Logger, k8s kubernetes.Interface, cfg *config.Con
jobCancelCheckerInterval: jobCancelCheckerInterval,
ignoreJobs: make(map[uuid.UUID]struct{}),
cancelCheckerChs: make(map[uuid.UUID]*onceChan),
agentTags: agentTags,
}
}

Expand Down Expand Up @@ -168,6 +178,13 @@ func (w *podWatcher) jobUUIDAndLogger(pod *corev1.Pod) (uuid.UUID, *zap.Logger,

log = log.With(zap.String("jobUUID", jobUUID.String()))

// Check that tags match - there may be pods around that were created by
// another controller using different tags.
if !agenttags.JobTagsMatchAgentTags(agenttags.ScanLabels(pod.Labels), w.agentTags) {
log.Debug("Pod labels do not match agent tags for this controller. Skipping.")
return uuid.UUID{}, log, errors.New("pod labels do not match agent tags for this controller")
}

w.ignoreJobsMu.RLock()
defer w.ignoreJobsMu.RUnlock()

Expand Down Expand Up @@ -283,15 +300,7 @@ func (w *podWatcher) failJob(ctx context.Context, log *zap.Logger, pod *corev1.P
}

// Tags are required order to connect the agent.
var tags []string
for key, value := range pod.Labels {
k, has := strings.CutPrefix(key, "tag.buildkite.com/")
if !has {
continue
}
tags = append(tags, fmt.Sprintf("%s=%s", k, value))
}

tags := agenttags.TagsFromLabels(pod.Labels)
opts := w.cfg.AgentConfig.ControllerOptions()

if err := failJob(ctx, w.logger, agentToken, jobUUID.String(), tags, message.String(), opts...); err != nil {
Expand Down
Loading

0 comments on commit 0fa2554

Please sign in to comment.