diff --git a/go.mod b/go.mod index 04532f53..08d5b4e1 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/buildkite/agent-stack-k8s/v2 -go 1.22.6 - -toolchain go1.22.7 +go 1.23.3 require ( github.com/Khan/genqlient v0.7.0 diff --git a/internal/controller/agenttags/tags.go b/internal/controller/agenttags/tags.go index 07d7cd80..fc51c13b 100644 --- a/internal/controller/agenttags/tags.go +++ b/internal/controller/agenttags/tags.go @@ -3,32 +3,35 @@ package agenttags import ( "errors" "fmt" + "iter" "strings" "k8s.io/apimachinery/pkg/util/validation" ) -// ToMap converts a slice of strings of the form `k=v` to a map where the +// TagMapFromTags converts a slice of strings of the form `k=v` to a map where the // key is `k` and the value is `v`. If any element of the slice does not // have that form, it will not be inserted into the map and instead generate // an error which will be appended to the second return value. -func ToMap(tags []string) (map[string]string, []error) { - m := map[string]string{} - errs := []error{} +func TagMapFromTags(tags []string) (map[string]string, []error) { + m := make(map[string]string, len(tags)) + var errs []error for _, tag := range tags { - parts := strings.SplitN(tag, "=", 2) - if len(parts) != 2 { + k, v, has := strings.Cut(tag, "=") + if !has { errs = append(errs, fmt.Errorf("invalid agent tag: %q", tag)) continue } - m[parts[0]] = parts[1] + m[k] = v } return m, errs } -func mapToLabels(m map[string]string) (map[string]string, []error) { - labels := map[string]string{} - errs := []error{} +// labelsFromTagMap converts map[key->value] to map[tag.buildkite.com/key->value], +// with k8s compatibility checks +func labelsFromTagMap(m map[string]string) (map[string]string, []error) { + labels := make(map[string]string, len(m)) + var errs []error for k, v := range m { namespacedKey := "tag.buildkite.com/" + k if errMsgs := validation.IsQualifiedName(namespacedKey); len(errMsgs) > 0 { @@ -50,7 +53,7 @@ func mapToLabels(m map[string]string) (map[string]string, []error) { return labels, errs } -// ToLabels converts a slice of strings of the form `k=v` to a map where the +// LabelsFromTags converts a slice of strings of the form `k=v` to a map where the // key is `k` and the value is `v`. If any element of the slice does not // have that form or if `k` is not a valid kubernetes label name or if `v` // is not a valid kubernetes label value, it will not be inserted into the @@ -58,25 +61,51 @@ func mapToLabels(m map[string]string) (map[string]string, []error) { // return value. // // See https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set -func ToLabels(tags []string) (map[string]string, []error) { - m, errs1 := ToMap(tags) - labels, errs2 := mapToLabels(m) +func LabelsFromTags(tags []string) (map[string]string, []error) { + m, errs1 := TagMapFromTags(tags) + labels, errs2 := labelsFromTagMap(m) return labels, append(errs1, errs2...) } -// JobTagsMatchAgentTags returns true if and only if, for each tag key in -// `jobTags`: either the tag key is also present in `agentTags`, and the tag -// value in `jobTags` is "*" or the same as the tag value in `agentTags` +// JobTagsMatchAgentTags reports whether each tag key in `jobTags` is also +// present in `agentTags`, and the tag value in `jobTags` is either "*" or the +// same as the tag value in `agentTags`. // // In the future, this may be expanded to: if the tag value `agentTags` is in some // set of strings defined by the tag value in `jobTags` (eg a glob or regex) // See https://buildkite.com/docs/agent/v3/cli-start#agent-targeting -func JobTagsMatchAgentTags(jobTags, agentTags map[string]string) bool { +func JobTagsMatchAgentTags(jobTags iter.Seq2[string, string], agentTags map[string]string) bool { for k, v := range jobTags { agentTagValue, exists := agentTags[k] - if !exists || (v != "*" && v != agentTagValue) { + if !exists { + return false + } + if v != "*" && v != agentTagValue { return false } } return true } + +// ScanLabels returns an iterator over all labels that are tags. +func ScanLabels(labels map[string]string) iter.Seq2[string, string] { + return func(yield func(string, string) bool) { + for key, value := range labels { + k, has := strings.CutPrefix(key, "tag.buildkite.com/") + if !has { + continue + } + if !yield(k, value) { + return + } + } + } +} + +// TagsFromLabels converts job or pod labels into a slice of agent/job tags. +func TagsFromLabels(labels map[string]string) (tags []string) { + for key, value := range ScanLabels(labels) { + tags = append(tags, key+"="+value) + } + return tags +} diff --git a/internal/controller/agenttags/tags_test.go b/internal/controller/agenttags/tags_test.go index 668364ec..ddc54b85 100644 --- a/internal/controller/agenttags/tags_test.go +++ b/internal/controller/agenttags/tags_test.go @@ -3,13 +3,14 @@ package agenttags_test import ( "errors" "fmt" + "maps" "testing" "github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags" "github.com/stretchr/testify/assert" ) -func TestToMap(t *testing.T) { +func TestMapFromTags(t *testing.T) { t.Parallel() for i, test := range []struct { @@ -63,7 +64,7 @@ func TestToMap(t *testing.T) { test := test t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { t.Parallel() - m, errs := agenttags.ToMap(test.agentTags) + m, errs := agenttags.TagMapFromTags(test.agentTags) if test.expectedErrs != nil { assert.Equal(t, test.expectedErrs, errs) } @@ -73,27 +74,31 @@ func TestToMap(t *testing.T) { } -func TestToLabels(t *testing.T) { +func TestLabelsFromTags(t *testing.T) { t.Parallel() const invalidLabelErrMsg = "a valid label must be an empty string or consist of alphanumeric characters, '-', '_' or '.', and must start and end with an alphanumeric character (e.g. 'MyValue', or 'my_value', or '12345', regex used for validation is '(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])?')" - for i, test := range []struct { + for _, test := range []struct { + name string agentTags []string expectedLabels map[string]string expectedErrs []error }{ { + name: "empty tags", agentTags: []string{}, expectedLabels: map[string]string{}, }, { + name: "valid queue", agentTags: []string{"queue=kubernetes"}, expectedLabels: map[string]string{ "tag.buildkite.com/queue": "kubernetes", }, }, { + name: "valid queue and arch", agentTags: []string{"queue=kubernetes", "arch=arm64"}, expectedLabels: map[string]string{ "tag.buildkite.com/queue": "kubernetes", @@ -101,6 +106,7 @@ func TestToLabels(t *testing.T) { }, }, { + name: "valid queue and arch (swapped order)", agentTags: []string{"arch=arm64", "queue=kubernetes"}, expectedLabels: map[string]string{ "tag.buildkite.com/queue": "kubernetes", @@ -108,21 +114,22 @@ func TestToLabels(t *testing.T) { }, }, { + name: "k8s rejects value", agentTags: []string{"queue=kubernetes=2"}, expectedLabels: map[string]string{}, expectedErrs: []error{errors.New(invalidLabelErrMsg)}, }, { + name: "empty value", agentTags: []string{"queue="}, expectedLabels: map[string]string{ "tag.buildkite.com/queue": "", }, }, } { - test := test - t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + t.Run(test.name, func(t *testing.T) { t.Parallel() - labels, errs := agenttags.ToLabels(test.agentTags) + labels, errs := agenttags.LabelsFromTags(test.agentTags) if test.expectedErrs != nil { assert.Equal(t, test.expectedErrs, errs) } @@ -199,7 +206,7 @@ func TestJobTagsMatchAgentTags(t *testing.T) { t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { t.Parallel() - actualResult := agenttags.JobTagsMatchAgentTags(test.jobTags, test.agentTags) + actualResult := agenttags.JobTagsMatchAgentTags(maps.All(test.jobTags), test.agentTags) assert.Equal( t, test.expectedResult, diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 61ca485b..23b702db 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -138,7 +138,7 @@ func NewInformerFactory( namespace string, tags []string, ) (informers.SharedInformerFactory, error) { - labelsFromTags, errs := agenttags.ToLabels(tags) + labelsFromTags, errs := agenttags.LabelsFromTags(tags) if len(errs) != 0 { return nil, errors.Join(errs...) } diff --git a/internal/controller/monitor/monitor.go b/internal/controller/monitor/monitor.go index 1aa629c3..2a19d8fa 100644 --- a/internal/controller/monitor/monitor.go +++ b/internal/controller/monitor/monitor.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "errors" "fmt" + "maps" "math/rand/v2" "reflect" "sync" @@ -117,19 +118,14 @@ func (m *Monitor) getScheduledCommandJobs(ctx context.Context, queue string) (jo return clusteredJobResp(*resp), err } -func toMapAndLogErrors(logger *zap.Logger, tags []string) map[string]string { - agentTags, tagErrs := agenttags.ToMap(tags) - if len(tagErrs) != 0 { - logger.Warn("making a map of agent tags", zap.Errors("err", tagErrs)) - } - return agentTags -} - func (m *Monitor) Start(ctx context.Context, handler model.JobHandler) <-chan error { logger := m.logger.With(zap.String("org", m.cfg.Org)) errs := make(chan error, 1) - agentTags := toMapAndLogErrors(logger, m.cfg.Tags) + agentTags, tagErrs := agenttags.TagMapFromTags(m.cfg.Tags) + if len(tagErrs) != 0 { + logger.Warn("making a map of agent tags", zap.Errors("err", tagErrs)) + } var queue string var ok bool @@ -242,11 +238,14 @@ func jobHandlerWorker(ctx, staleCtx context.Context, logger *zap.Logger, handler if j == nil { return } - jobTags := toMapAndLogErrors(logger, j.AgentQueryRules) + jobTags, tagErrs := agenttags.TagMapFromTags(j.AgentQueryRules) + if len(tagErrs) != 0 { + logger.Warn("making a map of job tags", zap.Errors("err", tagErrs)) + } // The api returns jobs that match ANY agent tags (the agent query rules) // However, we can only acquire jobs that match ALL agent tags - if !agenttags.JobTagsMatchAgentTags(jobTags, agentTags) { + if !agenttags.JobTagsMatchAgentTags(maps.All(jobTags), agentTags) { logger.Debug("skipping job because it did not match all tags", zap.Any("job", j)) continue } diff --git a/internal/controller/scheduler/fail_job.go b/internal/controller/scheduler/fail_job.go index 6e94164a..da8b5995 100644 --- a/internal/controller/scheduler/fail_job.go +++ b/internal/controller/scheduler/fail_job.go @@ -33,7 +33,7 @@ func failJob( }, options...) // queue is required for acquire! maybe more - ctr, err := agentcore.NewController(ctx, agentToken, kjobName(jobUUID), tags, opts...) + ctr, err := agentcore.NewController(ctx, agentToken, k8sJobName(jobUUID), tags, opts...) if err != nil { zapLogger.Error("registering or connecting ephemeral agent", zap.Error(err)) return fmt.Errorf("registering or connecting ephemeral agent: %w", err) diff --git a/internal/controller/scheduler/pod_watcher.go b/internal/controller/scheduler/pod_watcher.go index 3744c551..a8033139 100644 --- a/internal/controller/scheduler/pod_watcher.go +++ b/internal/controller/scheduler/pod_watcher.go @@ -11,6 +11,7 @@ import ( "time" "github.com/buildkite/agent-stack-k8s/v2/api" + "github.com/buildkite/agent-stack-k8s/v2/internal/controller/agenttags" "github.com/buildkite/agent-stack-k8s/v2/internal/controller/config" agentcore "github.com/buildkite/agent/v3/core" @@ -54,6 +55,8 @@ type podWatcher struct { // library outside of our control is a carve-out from the usual rule.) // The context is needed to ensure job cancel checkers are cleaned up. resourceEventHandlerCtx context.Context + + agentTags map[string]string } // NewPodWatcher creates an informer that does various things with pods and @@ -75,6 +78,12 @@ func NewPodWatcher(logger *zap.Logger, k8s kubernetes.Interface, cfg *config.Con if jobCancelCheckerInterval <= 0 { jobCancelCheckerInterval = config.DefaultJobCancelCheckerPollInterval } + + agentTags, errs := agenttags.TagMapFromTags(cfg.Tags) + if len(errs) > 0 { + logger.Warn("parsing agent tags", zap.Errors("errors", errs)) + } + return &podWatcher{ logger: logger, k8s: k8s, @@ -84,6 +93,7 @@ func NewPodWatcher(logger *zap.Logger, k8s kubernetes.Interface, cfg *config.Con jobCancelCheckerInterval: jobCancelCheckerInterval, ignoreJobs: make(map[uuid.UUID]struct{}), cancelCheckerChs: make(map[uuid.UUID]*onceChan), + agentTags: agentTags, } } @@ -168,6 +178,13 @@ func (w *podWatcher) jobUUIDAndLogger(pod *corev1.Pod) (uuid.UUID, *zap.Logger, log = log.With(zap.String("jobUUID", jobUUID.String())) + // Check that tags match - there may be pods around that were created by + // another controller using different tags. + if !agenttags.JobTagsMatchAgentTags(agenttags.ScanLabels(pod.Labels), w.agentTags) { + log.Debug("Pod labels do not match agent tags for this controller. Skipping.") + return uuid.UUID{}, log, errors.New("pod labels do not match agent tags for this controller") + } + w.ignoreJobsMu.RLock() defer w.ignoreJobsMu.RUnlock() @@ -283,15 +300,7 @@ func (w *podWatcher) failJob(ctx context.Context, log *zap.Logger, pod *corev1.P } // Tags are required order to connect the agent. - var tags []string - for key, value := range pod.Labels { - k, has := strings.CutPrefix(key, "tag.buildkite.com/") - if !has { - continue - } - tags = append(tags, fmt.Sprintf("%s=%s", k, value)) - } - + tags := agenttags.TagsFromLabels(pod.Labels) opts := w.cfg.AgentConfig.ControllerOptions() if err := failJob(ctx, w.logger, agentToken, jobUUID.String(), tags, message.String(), opts...); err != nil { diff --git a/internal/controller/scheduler/scheduler.go b/internal/controller/scheduler/scheduler.go index 47c470c7..008a02c5 100644 --- a/internal/controller/scheduler/scheduler.go +++ b/internal/controller/scheduler/scheduler.go @@ -203,7 +203,7 @@ func (w *worker) Build(podSpec *corev1.PodSpec, skipCheckout bool, inputs buildI kjob := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: kjobName(inputs.uuid), + Name: k8sJobName(inputs.uuid), Labels: make(map[string]string), Annotations: make(map[string]string), }, @@ -217,9 +217,21 @@ func (w *worker) Build(podSpec *corev1.PodSpec, skipCheckout bool, inputs buildI } kjob.Labels[config.UUIDLabel] = inputs.uuid - w.labelWithAgentTags(kjob.Labels, inputs.agentQueryRules) - kjob.Annotations[config.BuildURLAnnotation] = inputs.envMap["BUILDKITE_BUILD_URL"] - w.annotateWithJobURL(kjob.Annotations, inputs.uuid, inputs.envMap) + tagLabels, errs := agenttags.LabelsFromTags(inputs.agentQueryRules) + if len(errs) > 0 { + w.logger.Warn("converting all tags to labels", zap.Errors("errs", errs)) + } + maps.Copy(kjob.Labels, tagLabels) + + buildURL := inputs.envMap["BUILDKITE_BUILD_URL"] + kjob.Annotations[config.BuildURLAnnotation] = buildURL + jobURL, err := w.jobURL(inputs.uuid, buildURL) + if err != nil { + w.logger.Warn("could not parse BuildURL when annotating with JobURL", zap.String("buildURL", buildURL)) + } + if jobURL != "" { + kjob.Annotations[config.JobURLAnnotation] = jobURL + } // Prevent k8s cluster autoscaler from terminating the job before it finishes to scale down cluster kjob.Annotations["cluster-autoscaler.kubernetes.io/safe-to-evict"] = "false" @@ -452,18 +464,15 @@ func (w *worker) Build(podSpec *corev1.PodSpec, skipCheckout bool, inputs buildI } } - agentTags := []agentTag{ - { - Name: "k8s:agent-stack-version", - Value: version.Version(), - }, + agentTags := map[string]string{ + "k8s:agent-stack-version": version.Version(), } - if tags, err := agentTagsFromJob(inputs.agentQueryRules); err != nil { - w.logger.Warn("error parsing job tags", zap.String("job", inputs.uuid)) - } else { - agentTags = append(agentTags, tags...) + tags, errs := agenttags.TagMapFromTags(inputs.agentQueryRules) + if len(errs) > 0 { + w.logger.Warn("errors parsing job tags", zap.String("job", inputs.uuid), zap.Errors("errors", errs)) } + maps.Copy(agentTags, tags) // Agent server container // This runs the "upper layer" of the agent that is responsible for talking @@ -864,62 +873,24 @@ func (w *worker) failJob(ctx context.Context, inputs buildInputs, message string return failJob(ctx, w.logger, agentToken, inputs.uuid, inputs.agentQueryRules, message, opts...) } -func (w *worker) labelWithAgentTags(dstLabels map[string]string, agentQueryRules []string) { - ls, errs := agenttags.ToLabels(agentQueryRules) - if len(errs) != 0 { - w.logger.Warn("converting all tags to labels", zap.Errors("errs", errs)) - } - - for k, v := range ls { - dstLabels[k] = v - } -} - -func (w *worker) annotateWithJobURL(dstAnnotations map[string]string, jobUUID string, envMap map[string]string) { - buildURL := envMap["BUILDKITE_BUILD_URL"] +func (w *worker) jobURL(jobUUID string, buildURL string) (string, error) { u, err := url.Parse(buildURL) if err != nil { - w.logger.Warn( - "could not parse BuildURL when annotating with JobURL", - zap.String("buildURL", buildURL), - ) - return + return "", err } u.Fragment = jobUUID - dstAnnotations[config.JobURLAnnotation] = u.String() + return u.String(), nil } -func kjobName(jobUUID string) string { +func k8sJobName(jobUUID string) string { return fmt.Sprintf("buildkite-%s", jobUUID) } -type agentTag struct { - Name string - Value string -} - -func agentTagsFromJob(agentQueryRules []string) ([]agentTag, error) { - agentTags := make([]agentTag, 0, len(agentQueryRules)) - for _, tag := range agentQueryRules { - k, v, found := strings.Cut(tag, "=") - if !found { - return nil, fmt.Errorf("could not parse tag: %q", tag) - } - agentTags = append(agentTags, agentTag{Name: k, Value: v}) - } - - return agentTags, nil -} - -func createAgentTagString(tags []agentTag) string { - var sb strings.Builder - for i, t := range tags { - sb.WriteString(t.Name) - sb.WriteString("=") - sb.WriteString(t.Value) - if i < len(tags)-1 { - sb.WriteString(",") - } +// Format each agentTag as key=value and join with , +func createAgentTagString(tags map[string]string) string { + ts := make([]string, 0, len(tags)) + for k, v := range tags { + ts = append(ts, k+"="+v) } - return sb.String() + return strings.Join(ts, ",") }