Skip to content

Commit

Permalink
Fix issues in tagging controller * Don't tag Fargate nodes * Label no…
Browse files Browse the repository at this point in the history
…des to prevent retagging them again * Increase bucket size for latency metrics
  • Loading branch information
saurav-agarwalla committed May 25, 2022
1 parent 9176464 commit c80abd5
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 23 deletions.
1 change: 1 addition & 0 deletions pkg/controllers/tagging/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
Name: "cloudprovider_aws_tagging_controller_work_item_duration_seconds",
Help: "workitem latency of workitem being in the queue and time it takes to process",
StabilityLevel: metrics.ALPHA,
Buckets: metrics.ExponentialBuckets(0.5, 1.5, 20),
},
[]string{"latency_type"})

Expand Down
95 changes: 82 additions & 13 deletions pkg/controllers/tagging/tagging_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package tagging

import (
"crypto/md5"
"fmt"
v1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -25,7 +26,10 @@ import (
cloudprovider "k8s.io/cloud-provider"
opt "k8s.io/cloud-provider-aws/pkg/controllers/options"
awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1"
nodehelpers "k8s.io/cloud-provider/node/helpers"
"k8s.io/klog/v2"
"sort"
"strings"
"time"
)

Expand All @@ -37,7 +41,13 @@ type workItem struct {
enqueueTime time.Time
}

func (w workItem) String() string {
return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.node.GetName(), w.requeuingCount, w.enqueueTime)
}

const (
taggingControllerLabelKey = "k8s.io/cloud-provider-aws"

maxRequeuingCount = 9

// The label for depicting total number of errors a work item encounter and succeed
Expand Down Expand Up @@ -105,9 +115,27 @@ func NewTaggingController(
// Use shared informer to listen to add/update/delete of nodes. Note that any nodes
// that exist before tagging controller starts will show up in the update method
tc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { tc.enqueueNode(obj, tc.tagNodesResources) },
UpdateFunc: func(oldObj, newObj interface{}) { tc.enqueueNode(newObj, tc.tagNodesResources) },
DeleteFunc: func(obj interface{}) { tc.enqueueNode(obj, tc.untagNodeResources) },
AddFunc: func(obj interface{}) {
node := obj.(*v1.Node)
tc.enqueueNode(node, tc.tagNodesResources)
},
UpdateFunc: func(oldObj, newObj interface{}) {
node := newObj.(*v1.Node)
// Check if tagging is required by inspecting the labels. This check here prevents us from putting a tagged node into the
// work queue. We check this again before tagging the node to make sure that between when a node was put in the work queue
// and when it gets tagged, there might be another event which put the same item in the work queue
// (since the node won't have the labels yet) and hence prevents us from making an unnecessary EC2 call.
if !tc.isTaggingRequired(node) {
klog.Infof("Skip putting node %s in work queue since it was already tagged earlier.", node.GetName())
return
}

tc.enqueueNode(node, tc.tagNodesResources)
},
DeleteFunc: func(obj interface{}) {
node := obj.(*v1.Node)
tc.enqueueNode(node, tc.untagNodeResources)
},
})

return tc, nil
Expand Down Expand Up @@ -147,28 +175,36 @@ func (tc *Controller) process() bool {
return false
}

klog.Infof("Starting to process %v", obj)
klog.Infof("Starting to process %s", obj)

err := func(obj interface{}) error {
defer tc.workqueue.Done(obj)

workItem, ok := obj.(*workItem)
if !ok {
tc.workqueue.Forget(obj)
err := fmt.Errorf("expected workItem in workqueue but got %#v", obj)
err := fmt.Errorf("expected workItem in workqueue but got %s", obj)
utilruntime.HandleError(err)
return nil
}

timeTaken := time.Since(workItem.enqueueTime).Seconds()
recordWorkItemLatencyMetrics(workItemDequeuingTimeWorkItemMetric, timeTaken)
klog.Infof("Dequeuing latency %s", timeTaken)

instanceID, err := awsv1.KubernetesInstanceID(workItem.node.Spec.ProviderID).MapToAWSInstanceID()
if err != nil {
err = fmt.Errorf("Error in getting instanceID for node %s, error: %v", workItem.node.GetName(), err)
utilruntime.HandleError(err)
return nil
}
klog.Infof("Instance ID of work item %s is %s", workItem, instanceID)

if awsv1.IsFargateNode(string(instanceID)) {
klog.Infof("Skip processing the node %s since it is a Fargate node", instanceID)
tc.workqueue.Forget(obj)
return nil
}

err = workItem.action(workItem.node)

Expand All @@ -182,20 +218,21 @@ func (tc *Controller) process() bool {
return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), workItem.requeuingCount)
}

klog.Errorf("error processing work item '%v': %s, requeuing count exceeded", workItem, err.Error())
klog.Errorf("error processing work item %s: %s, requeuing count exceeded", workItem, err.Error())
recordWorkItemErrorMetrics(errorsAfterRetriesExhaustedWorkItemErrorMetric, string(instanceID))
} else {
klog.Infof("Finished processing %v", workItem)
klog.Infof("Finished processing %s", workItem)
timeTaken = time.Since(workItem.enqueueTime).Seconds()
recordWorkItemLatencyMetrics(workItemProcessingTimeWorkItemMetric, timeTaken)
klog.Infof("Processing latency %s", timeTaken)
}

tc.workqueue.Forget(obj)
return nil
}(obj)

if err != nil {
klog.Errorf("Error occurred while processing %v", obj)
klog.Errorf("Error occurred while processing %s", obj)
utilruntime.HandleError(err)
}

Expand All @@ -221,16 +258,28 @@ func (tc *Controller) tagNodesResources(node *v1.Node) error {
// tagEc2Instances applies the provided tags to each EC2 instance in
// the cluster.
func (tc *Controller) tagEc2Instance(node *v1.Node) error {
if !tc.isTaggingRequired(node) {
klog.Infof("Skip tagging node %s since it was already tagged earlier.", node.GetName())
return nil
}

instanceID, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()

err := tc.cloud.TagResource(string(instanceID), tc.tags)

if err != nil {
klog.Errorf("Error in tagging EC2 instance for node %s, error: %v", node.GetName(), err)
klog.Errorf("Error in tagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err)
return err
}

klog.Infof("Successfully tagged %s with %v", instanceID, tc.tags)
labels := map[string]string{taggingControllerLabelKey: tc.getChecksumOfTags()}
klog.Infof("Successfully tagged %s with %v. Labeling the nodes with tagging controller labels now.", instanceID, tc.tags)
if !nodehelpers.AddOrUpdateLabelsOnNode(tc.kubeClient, labels, node) {
klog.Errorf("Couldn't apply labels %s to node %s.", labels, node.GetName())
return fmt.Errorf("couldn't apply labels %s to node %s", labels, node.GetName())
}

klog.Infof("Successfully labeled node %s with %v.", node.GetName(), labels)

return nil
}
Expand Down Expand Up @@ -259,7 +308,7 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error {
err := tc.cloud.UntagResource(string(instanceID), tc.tags)

if err != nil {
klog.Errorf("Error in untagging EC2 instance for node %s, error: %v", node.GetName(), err)
klog.Errorf("Error in untagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err)
return err
}

Expand All @@ -270,8 +319,7 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error {

// enqueueNode takes in the object and an
// action for the object for a workitem and enqueue to the workqueue
func (tc *Controller) enqueueNode(obj interface{}, action func(node *v1.Node) error) {
node := obj.(*v1.Node)
func (tc *Controller) enqueueNode(node *v1.Node, action func(node *v1.Node) error) {
item := &workItem{
node: node,
action: action,
Expand All @@ -281,3 +329,24 @@ func (tc *Controller) enqueueNode(obj interface{}, action func(node *v1.Node) er
tc.workqueue.Add(item)
klog.Infof("Added %s to the workqueue", item)
}

func (tc *Controller) isTaggingRequired(node *v1.Node) bool {
if node.Labels == nil {
return true
}

if labelValue, ok := node.Labels[taggingControllerLabelKey]; !ok || labelValue != tc.getChecksumOfTags() {
return true
}

return false
}

func (tc *Controller) getChecksumOfTags() string {
tags := []string{}
for key, value := range tc.tags {
tags = append(tags, key+"="+value)
}
sort.Strings(tags)
return fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(tags, ","))))
}
52 changes: 50 additions & 2 deletions pkg/controllers/tagging/tagging_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,54 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
toBeTagged: true,
expectedMessages: []string{"Successfully tagged i-0001"},
},
{
name: "node0 joins the cluster and was tagged earlier with different tags.",
currNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
taggingControllerLabelKey: "9767c4972ba72e87ab553bad2afde741", // MD5 for key1=value1
},
},
Spec: v1.NodeSpec{
ProviderID: "i-0001",
},
},
toBeTagged: true,
expectedMessages: []string{"Successfully tagged i-0001"},
},
{
name: "node0 joins the cluster but isn't tagged because it was already tagged earlier.",
currNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{
taggingControllerLabelKey: "c812faa65d1d5e5aefa6b069b3da39df", // MD5 for key1=value1,key2=value2
},
},
Spec: v1.NodeSpec{
ProviderID: "i-0001",
},
},
toBeTagged: true,
expectedMessages: []string{"Skip tagging node node0 since it was already tagged earlier."},
},
{
name: "fargate node joins the cluster.",
currNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "fargatenode0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "aws:///us-west-2a/2ea696a557-9e55466d21eb4f83a99a9aa396bbd134/fargate-ip-10-0-55-27.us-west-2.compute.internal",
},
},
toBeTagged: true,
expectedMessages: []string{"Skip processing the node fargate-ip-10-0-55-27.us-west-2.compute.internal since it is a Fargate node"},
},
{
name: "node0 leaves the cluster, failed to untag.",
currNode: &v1.Node{
Expand All @@ -81,7 +129,7 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
},
},
toBeTagged: false,
expectedMessages: []string{"Error in untagging EC2 instance for node node0"},
expectedMessages: []string{"Error in untagging EC2 instance i-error for node node0"},
},
{
name: "node0 leaves the cluster.",
Expand Down Expand Up @@ -124,7 +172,7 @@ func Test_NodesJoiningAndLeaving(t *testing.T) {
kubeClient: clientset,
cloud: fakeAws,
nodeMonitorPeriod: 1 * time.Second,
tags: map[string]string{"key": "value"},
tags: map[string]string{"key2": "value2", "key1": "value1"},
resources: []string{"instance"},
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tagging"),
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/providers/v1/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -1657,7 +1657,7 @@ func (c *Cloud) NodeAddressesByProviderID(ctx context.Context, providerID string
return nil, err
}

if isFargateNode(string(instanceID)) {
if IsFargateNode(string(instanceID)) {
eni, err := c.describeNetworkInterfaces(string(instanceID))
if eni == nil || err != nil {
return nil, err
Expand All @@ -1681,7 +1681,7 @@ func (c *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID strin
return false, err
}

if isFargateNode(string(instanceID)) {
if IsFargateNode(string(instanceID)) {
eni, err := c.describeNetworkInterfaces(string(instanceID))
return eni != nil, err
}
Expand Down Expand Up @@ -1721,7 +1721,7 @@ func (c *Cloud) InstanceShutdownByProviderID(ctx context.Context, providerID str
return false, err
}

if isFargateNode(string(instanceID)) {
if IsFargateNode(string(instanceID)) {
eni, err := c.describeNetworkInterfaces(string(instanceID))
return eni != nil, err
}
Expand Down Expand Up @@ -1782,7 +1782,7 @@ func (c *Cloud) InstanceTypeByProviderID(ctx context.Context, providerID string)
return "", err
}

if isFargateNode(string(instanceID)) {
if IsFargateNode(string(instanceID)) {
return "", nil
}

Expand Down Expand Up @@ -1891,7 +1891,7 @@ func (c *Cloud) GetZoneByProviderID(ctx context.Context, providerID string) (clo
return cloudprovider.Zone{}, err
}

if isFargateNode(string(instanceID)) {
if IsFargateNode(string(instanceID)) {
eni, err := c.describeNetworkInterfaces(string(instanceID))
if eni == nil || err != nil {
return cloudprovider.Zone{}, err
Expand Down Expand Up @@ -4938,8 +4938,8 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins
return awsInstance, instance, err
}

// isFargateNode returns true if given node runs on Fargate compute
func isFargateNode(nodeName string) bool {
// IsFargateNode returns true if given node runs on Fargate compute
func IsFargateNode(nodeName string) bool {
return strings.HasPrefix(nodeName, fargateNodeNamePrefix)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/providers/v1/instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (name KubernetesInstanceID) MapToAWSInstanceID() (InstanceID, error) {

// We sanity check the resulting volume; the two known formats are
// i-12345678 and i-12345678abcdef01
if awsID == "" || !(awsInstanceRegMatch.MatchString(awsID) || isFargateNode(awsID)) {
if awsID == "" || !(awsInstanceRegMatch.MatchString(awsID) || IsFargateNode(awsID)) {
return "", fmt.Errorf("Invalid format for AWS instance (%s)", name)
}

Expand Down

0 comments on commit c80abd5

Please sign in to comment.