From 9176464a711d28e76d3ab2489dc1d55937ab461a Mon Sep 17 00:00:00 2001 From: Sri Saran Balaji Vellore Rajakumar Date: Tue, 16 Feb 2021 18:15:30 -0800 Subject: [PATCH 1/2] Add ENI support for nodes(for Fargate nodes) --- pkg/providers/v1/aws.go | 100 +++++++++++++++++++++++++++++ pkg/providers/v1/aws_fakes.go | 27 ++++++++ pkg/providers/v1/aws_test.go | 82 +++++++++++++++++++++++ pkg/providers/v1/instances.go | 12 ++-- pkg/providers/v1/instances_test.go | 8 +++ 5 files changed, 222 insertions(+), 7 deletions(-) diff --git a/pkg/providers/v1/aws.go b/pkg/providers/v1/aws.go index b08a7b308d..b8fb43a81e 100644 --- a/pkg/providers/v1/aws.go +++ b/pkg/providers/v1/aws.go @@ -272,6 +272,12 @@ const ( // Number of node names that can be added to a filter. The AWS limit is 200 // but we are using a lower limit on purpose filterNodeLimit = 150 + + // fargateNodeNamePrefix string is added to awsInstance nodeName and providerID of Fargate nodes. + fargateNodeNamePrefix = "fargate-" + + // privateDNSNamePrefix is the prefix added to ENI Private DNS Name. + privateDNSNamePrefix = "ip-" ) const ( @@ -357,6 +363,8 @@ type EC2 interface { ModifyInstanceAttribute(request *ec2.ModifyInstanceAttributeInput) (*ec2.ModifyInstanceAttributeOutput, error) DescribeVpcs(input *ec2.DescribeVpcsInput) (*ec2.DescribeVpcsOutput, error) + + DescribeNetworkInterfaces(input *ec2.DescribeNetworkInterfacesInput) (*ec2.DescribeNetworkInterfacesOutput, error) } // ELB is a simple pass-through of AWS' ELB client interface, which allows for testing @@ -963,6 +971,15 @@ func (s *awsSdkEC2) DescribeInstances(request *ec2.DescribeInstancesInput) ([]*e return results, nil } +// DescribeNetworkInterfaces describes network interface provided in the input. +func (s *awsSdkEC2) DescribeNetworkInterfaces(input *ec2.DescribeNetworkInterfacesInput) (*ec2.DescribeNetworkInterfacesOutput, error) { + requestTime := time.Now() + resp, err := s.ec2.DescribeNetworkInterfaces(input) + timeTaken := time.Since(requestTime).Seconds() + recordAWSMetric("describe_network_interfaces", timeTaken, err) + return resp, err +} + // Implements EC2.DescribeSecurityGroups func (s *awsSdkEC2) DescribeSecurityGroups(request *ec2.DescribeSecurityGroupsInput) ([]*ec2.SecurityGroup, error) { // Security groups are paged @@ -1621,6 +1638,16 @@ func extractNodeAddresses(instance *ec2.Instance) ([]v1.NodeAddress, error) { return addresses, nil } +// getNodeAddressesForFargateNode generates list of Node addresses for Fargate node. +func getNodeAddressesForFargateNode(privateDNSName, privateIP string) []v1.NodeAddress { + addresses := []v1.NodeAddress{} + addresses = append(addresses, v1.NodeAddress{Type: v1.NodeInternalIP, Address: privateIP}) + if privateDNSName != "" { + addresses = append(addresses, v1.NodeAddress{Type: v1.NodeInternalDNS, Address: privateDNSName}) + } + return addresses +} + // NodeAddressesByProviderID returns the node addresses of an instances with the specified unique providerID // This method will not be called from the node that is requesting this ID. i.e. metadata service // and other local methods cannot be used here @@ -1630,6 +1657,14 @@ func (c *Cloud) NodeAddressesByProviderID(ctx context.Context, providerID string return nil, err } + if isFargateNode(string(instanceID)) { + eni, err := c.describeNetworkInterfaces(string(instanceID)) + if eni == nil || err != nil { + return nil, err + } + return getNodeAddressesForFargateNode(aws.StringValue(eni.PrivateDnsName), aws.StringValue(eni.PrivateIpAddress)), nil + } + instance, err := describeInstance(c.ec2, instanceID) if err != nil { return nil, err @@ -1646,6 +1681,11 @@ func (c *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID strin return false, err } + if isFargateNode(string(instanceID)) { + eni, err := c.describeNetworkInterfaces(string(instanceID)) + return eni != nil, err + } + request := &ec2.DescribeInstancesInput{ InstanceIds: []*string{instanceID.awsString()}, } @@ -1681,6 +1721,11 @@ func (c *Cloud) InstanceShutdownByProviderID(ctx context.Context, providerID str return false, err } + if isFargateNode(string(instanceID)) { + eni, err := c.describeNetworkInterfaces(string(instanceID)) + return eni != nil, err + } + request := &ec2.DescribeInstancesInput{ InstanceIds: []*string{instanceID.awsString()}, } @@ -1737,6 +1782,10 @@ func (c *Cloud) InstanceTypeByProviderID(ctx context.Context, providerID string) return "", err } + if isFargateNode(string(instanceID)) { + return "", nil + } + instance, err := describeInstance(c.ec2, instanceID) if err != nil { return "", err @@ -1841,6 +1890,18 @@ func (c *Cloud) GetZoneByProviderID(ctx context.Context, providerID string) (clo if err != nil { return cloudprovider.Zone{}, err } + + if isFargateNode(string(instanceID)) { + eni, err := c.describeNetworkInterfaces(string(instanceID)) + if eni == nil || err != nil { + return cloudprovider.Zone{}, err + } + return cloudprovider.Zone{ + FailureDomain: *eni.AvailabilityZone, + Region: c.region, + }, nil + } + instance, err := c.getInstanceByID(string(instanceID)) if err != nil { return cloudprovider.Zone{}, err @@ -4877,6 +4938,11 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins return awsInstance, instance, err } +// isFargateNode returns true if given node runs on Fargate compute +func isFargateNode(nodeName string) bool { + return strings.HasPrefix(nodeName, fargateNodeNamePrefix) +} + func (c *Cloud) nodeNameToProviderID(nodeName types.NodeName) (InstanceID, error) { if len(nodeName) == 0 { return "", fmt.Errorf("no nodeName provided") @@ -4930,3 +4996,37 @@ func getInitialAttachDetachDelay(status string) time.Duration { } return volumeAttachmentStatusInitialDelay } + +// describeNetworkInterfaces returns network interface information for the given DNS name. +func (c *Cloud) describeNetworkInterfaces(nodeName string) (*ec2.NetworkInterface, error) { + eniEndpoint := strings.TrimPrefix(nodeName, fargateNodeNamePrefix) + + filters := []*ec2.Filter{ + newEc2Filter("attachment.status", "attached"), + newEc2Filter("vpc-id", c.vpcID), + } + + // when enableDnsSupport is set to false in a VPC, interface will not have private DNS names. + if strings.HasPrefix(eniEndpoint, privateDNSNamePrefix) { + filters = append(filters, newEc2Filter("private-dns-name", eniEndpoint)) + } else { + filters = append(filters, newEc2Filter("private-ip-address", eniEndpoint)) + } + + request := &ec2.DescribeNetworkInterfacesInput{ + Filters: filters, + } + + eni, err := c.ec2.DescribeNetworkInterfaces(request) + if err != nil { + return nil, err + } + if len(eni.NetworkInterfaces) == 0 { + return nil, nil + } + if len(eni.NetworkInterfaces) != 1 { + // This should not be possible - ids should be unique + return nil, fmt.Errorf("multiple interfaces found with same id %q", eni.NetworkInterfaces) + } + return eni.NetworkInterfaces[0], nil +} diff --git a/pkg/providers/v1/aws_fakes.go b/pkg/providers/v1/aws_fakes.go index 89d541790f..8465789b06 100644 --- a/pkg/providers/v1/aws_fakes.go +++ b/pkg/providers/v1/aws_fakes.go @@ -701,3 +701,30 @@ func contains(haystack []*string, needle string) bool { } return false } + +// DescribeNetworkInterfaces returns list of ENIs for testing +func (ec2i *FakeEC2Impl) DescribeNetworkInterfaces(input *ec2.DescribeNetworkInterfacesInput) (*ec2.DescribeNetworkInterfacesOutput, error) { + networkInterface := []*ec2.NetworkInterface{ + { + PrivateIpAddress: aws.String("1.2.3.4"), + AvailabilityZone: aws.String("us-west-2c"), + }, + } + for _, filter := range input.Filters { + if strings.HasPrefix(*filter.Values[0], fargateNodeNamePrefix) { + // verify filter doesn't have fargate prefix + panic(fmt.Sprintf("invalid endpoint specified for DescribeNetworkInterface call %s", *filter.Values[0])) + } else if strings.HasPrefix(*filter.Values[0], "not-found") { + // for negative testing + return &ec2.DescribeNetworkInterfacesOutput{}, nil + } + + if *filter.Name == "private-dns-name" { + networkInterface[0].PrivateDnsName = aws.String("ip-1-2-3-4.compute.amazon.com") + } + } + + return &ec2.DescribeNetworkInterfacesOutput{ + NetworkInterfaces: networkInterface, + }, nil +} diff --git a/pkg/providers/v1/aws_test.go b/pkg/providers/v1/aws_test.go index a599009f8b..5da45733c4 100644 --- a/pkg/providers/v1/aws_test.go +++ b/pkg/providers/v1/aws_test.go @@ -3065,3 +3065,85 @@ func TestCloud_buildNLBHealthCheckConfiguration(t *testing.T) { }) } } + +func TestNodeAddressesForFargate(t *testing.T) { + awsServices := newMockedFakeAWSServices(TestClusterID) + c, _ := newAWSCloud(CloudConfig{}, awsServices) + + nodeAddresses, _ := c.NodeAddressesByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-ip-192.168.164.88") + verifyNodeAddressesForFargate(t, true, nodeAddresses) +} + +func TestNodeAddressesForFargatePrivateIP(t *testing.T) { + awsServices := newMockedFakeAWSServices(TestClusterID) + c, _ := newAWSCloud(CloudConfig{}, awsServices) + + nodeAddresses, _ := c.NodeAddressesByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-192.168.164.88") + verifyNodeAddressesForFargate(t, false, nodeAddresses) +} + +func verifyNodeAddressesForFargate(t *testing.T, verifyPublicIP bool, nodeAddresses []v1.NodeAddress) { + if verifyPublicIP { + assert.Equal(t, 2, len(nodeAddresses)) + assert.Equal(t, "ip-1-2-3-4.compute.amazon.com", nodeAddresses[1].Address) + assert.Equal(t, v1.NodeInternalDNS, nodeAddresses[1].Type) + } else { + assert.Equal(t, 1, len(nodeAddresses)) + } + assert.Equal(t, "1.2.3.4", nodeAddresses[0].Address) + assert.Equal(t, v1.NodeInternalIP, nodeAddresses[0].Type) +} + +func TestInstanceExistsByProviderIDForFargate(t *testing.T) { + awsServices := newMockedFakeAWSServices(TestClusterID) + c, _ := newAWSCloud(CloudConfig{}, awsServices) + + instanceExist, err := c.InstanceExistsByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-192.168.164.88") + assert.Nil(t, err) + assert.True(t, instanceExist) +} + +func TestInstanceNotExistsByProviderIDForFargate(t *testing.T) { + awsServices := newMockedFakeAWSServices(TestClusterID) + c, _ := newAWSCloud(CloudConfig{}, awsServices) + + instanceExist, err := c.InstanceExistsByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-not-found") + assert.Nil(t, err) + assert.False(t, instanceExist) +} + +func TestInstanceShutdownByProviderIDForFargate(t *testing.T) { + awsServices := newMockedFakeAWSServices(TestClusterID) + c, _ := newAWSCloud(CloudConfig{}, awsServices) + + instanceExist, err := c.InstanceShutdownByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-192.168.164.88") + assert.Nil(t, err) + assert.True(t, instanceExist) +} + +func TestInstanceShutdownNotExistsByProviderIDForFargate(t *testing.T) { + awsServices := newMockedFakeAWSServices(TestClusterID) + c, _ := newAWSCloud(CloudConfig{}, awsServices) + + instanceExist, err := c.InstanceShutdownByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-not-found") + assert.Nil(t, err) + assert.False(t, instanceExist) +} + +func TestInstanceTypeByProviderIDForFargate(t *testing.T) { + awsServices := newMockedFakeAWSServices(TestClusterID) + c, _ := newAWSCloud(CloudConfig{}, awsServices) + + instanceType, err := c.InstanceTypeByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-not-found") + assert.Nil(t, err) + assert.Equal(t, "", instanceType) +} + +func TestGetZoneByProviderIDForFargate(t *testing.T) { + awsServices := newMockedFakeAWSServices(TestClusterID) + c, _ := newAWSCloud(CloudConfig{}, awsServices) + + zoneDetails, err := c.GetZoneByProviderID(context.TODO(), "aws:///us-west-2c/1abc-2def/fargate-192.168.164.88") + assert.Nil(t, err) + assert.Equal(t, "us-west-2c", zoneDetails.FailureDomain) +} diff --git a/pkg/providers/v1/instances.go b/pkg/providers/v1/instances.go index 7768deb3bb..4a055ff5e8 100644 --- a/pkg/providers/v1/instances.go +++ b/pkg/providers/v1/instances.go @@ -52,6 +52,7 @@ func (i InstanceID) awsString() *string { // the following form // * aws://// // * aws://// +// * aws:////fargate- // * type KubernetesInstanceID string @@ -74,17 +75,14 @@ func (name KubernetesInstanceID) MapToAWSInstanceID() (InstanceID, error) { awsID := "" tokens := strings.Split(strings.Trim(url.Path, "/"), "/") - if len(tokens) == 1 { - // instanceId - awsID = tokens[0] - } else if len(tokens) == 2 { - // az/instanceId - awsID = tokens[1] + // last token in the providerID is the aws resource ID for both EC2 and Fargate nodes + if len(tokens) > 0 { + awsID = tokens[len(tokens)-1] } // We sanity check the resulting volume; the two known formats are // i-12345678 and i-12345678abcdef01 - if awsID == "" || !awsInstanceRegMatch.MatchString(awsID) { + if awsID == "" || !(awsInstanceRegMatch.MatchString(awsID) || isFargateNode(awsID)) { return "", fmt.Errorf("Invalid format for AWS instance (%s)", name) } diff --git a/pkg/providers/v1/instances_test.go b/pkg/providers/v1/instances_test.go index fddac2bbe5..280b9c8658 100644 --- a/pkg/providers/v1/instances_test.go +++ b/pkg/providers/v1/instances_test.go @@ -80,6 +80,14 @@ func TestMapToAWSInstanceIDs(t *testing.T) { Kubernetes: "", ExpectError: true, }, + { + Kubernetes: "aws:///us-west-2c/1abc-2def/fargate-ip-192-168-164-88.internal", + Aws: "fargate-ip-192-168-164-88.internal", + }, + { + Kubernetes: "aws:///us-west-2c/1abc-2def/fargate-192.168.164.88", + Aws: "fargate-192.168.164.88", + }, } for _, test := range tests { From c80abd5853ae1f5ae5a312842ea01de48167b683 Mon Sep 17 00:00:00 2001 From: Saurav Agarwalla Date: Sat, 21 May 2022 04:30:54 +0000 Subject: [PATCH 2/2] Fix issues in tagging controller * Don't tag Fargate nodes * Label nodes to prevent retagging them again * Increase bucket size for latency metrics --- pkg/controllers/tagging/metrics.go | 1 + pkg/controllers/tagging/tagging_controller.go | 95 ++++++++++++++++--- .../tagging/tagging_controller_test.go | 52 +++++++++- pkg/providers/v1/aws.go | 14 +-- pkg/providers/v1/instances.go | 2 +- 5 files changed, 141 insertions(+), 23 deletions(-) diff --git a/pkg/controllers/tagging/metrics.go b/pkg/controllers/tagging/metrics.go index 5a263086f2..6ac3a9db2f 100644 --- a/pkg/controllers/tagging/metrics.go +++ b/pkg/controllers/tagging/metrics.go @@ -27,6 +27,7 @@ var ( Name: "cloudprovider_aws_tagging_controller_work_item_duration_seconds", Help: "workitem latency of workitem being in the queue and time it takes to process", StabilityLevel: metrics.ALPHA, + Buckets: metrics.ExponentialBuckets(0.5, 1.5, 20), }, []string{"latency_type"}) diff --git a/pkg/controllers/tagging/tagging_controller.go b/pkg/controllers/tagging/tagging_controller.go index 9e56cb6337..d20a9defbb 100644 --- a/pkg/controllers/tagging/tagging_controller.go +++ b/pkg/controllers/tagging/tagging_controller.go @@ -14,6 +14,7 @@ limitations under the License. package tagging import ( + "crypto/md5" "fmt" v1 "k8s.io/api/core/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -25,7 +26,10 @@ import ( cloudprovider "k8s.io/cloud-provider" opt "k8s.io/cloud-provider-aws/pkg/controllers/options" awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1" + nodehelpers "k8s.io/cloud-provider/node/helpers" "k8s.io/klog/v2" + "sort" + "strings" "time" ) @@ -37,7 +41,13 @@ type workItem struct { enqueueTime time.Time } +func (w workItem) String() string { + return fmt.Sprintf("[Node: %s, RequeuingCount: %d, EnqueueTime: %s]", w.node.GetName(), w.requeuingCount, w.enqueueTime) +} + const ( + taggingControllerLabelKey = "k8s.io/cloud-provider-aws" + maxRequeuingCount = 9 // The label for depicting total number of errors a work item encounter and succeed @@ -105,9 +115,27 @@ func NewTaggingController( // Use shared informer to listen to add/update/delete of nodes. Note that any nodes // that exist before tagging controller starts will show up in the update method tc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { tc.enqueueNode(obj, tc.tagNodesResources) }, - UpdateFunc: func(oldObj, newObj interface{}) { tc.enqueueNode(newObj, tc.tagNodesResources) }, - DeleteFunc: func(obj interface{}) { tc.enqueueNode(obj, tc.untagNodeResources) }, + AddFunc: func(obj interface{}) { + node := obj.(*v1.Node) + tc.enqueueNode(node, tc.tagNodesResources) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + node := newObj.(*v1.Node) + // Check if tagging is required by inspecting the labels. This check here prevents us from putting a tagged node into the + // work queue. We check this again before tagging the node to make sure that between when a node was put in the work queue + // and when it gets tagged, there might be another event which put the same item in the work queue + // (since the node won't have the labels yet) and hence prevents us from making an unnecessary EC2 call. + if !tc.isTaggingRequired(node) { + klog.Infof("Skip putting node %s in work queue since it was already tagged earlier.", node.GetName()) + return + } + + tc.enqueueNode(node, tc.tagNodesResources) + }, + DeleteFunc: func(obj interface{}) { + node := obj.(*v1.Node) + tc.enqueueNode(node, tc.untagNodeResources) + }, }) return tc, nil @@ -147,7 +175,7 @@ func (tc *Controller) process() bool { return false } - klog.Infof("Starting to process %v", obj) + klog.Infof("Starting to process %s", obj) err := func(obj interface{}) error { defer tc.workqueue.Done(obj) @@ -155,13 +183,14 @@ func (tc *Controller) process() bool { workItem, ok := obj.(*workItem) if !ok { tc.workqueue.Forget(obj) - err := fmt.Errorf("expected workItem in workqueue but got %#v", obj) + err := fmt.Errorf("expected workItem in workqueue but got %s", obj) utilruntime.HandleError(err) return nil } timeTaken := time.Since(workItem.enqueueTime).Seconds() recordWorkItemLatencyMetrics(workItemDequeuingTimeWorkItemMetric, timeTaken) + klog.Infof("Dequeuing latency %s", timeTaken) instanceID, err := awsv1.KubernetesInstanceID(workItem.node.Spec.ProviderID).MapToAWSInstanceID() if err != nil { @@ -169,6 +198,13 @@ func (tc *Controller) process() bool { utilruntime.HandleError(err) return nil } + klog.Infof("Instance ID of work item %s is %s", workItem, instanceID) + + if awsv1.IsFargateNode(string(instanceID)) { + klog.Infof("Skip processing the node %s since it is a Fargate node", instanceID) + tc.workqueue.Forget(obj) + return nil + } err = workItem.action(workItem.node) @@ -182,12 +218,13 @@ func (tc *Controller) process() bool { return fmt.Errorf("error processing work item '%v': %s, requeuing count %d", workItem, err.Error(), workItem.requeuingCount) } - klog.Errorf("error processing work item '%v': %s, requeuing count exceeded", workItem, err.Error()) + klog.Errorf("error processing work item %s: %s, requeuing count exceeded", workItem, err.Error()) recordWorkItemErrorMetrics(errorsAfterRetriesExhaustedWorkItemErrorMetric, string(instanceID)) } else { - klog.Infof("Finished processing %v", workItem) + klog.Infof("Finished processing %s", workItem) timeTaken = time.Since(workItem.enqueueTime).Seconds() recordWorkItemLatencyMetrics(workItemProcessingTimeWorkItemMetric, timeTaken) + klog.Infof("Processing latency %s", timeTaken) } tc.workqueue.Forget(obj) @@ -195,7 +232,7 @@ func (tc *Controller) process() bool { }(obj) if err != nil { - klog.Errorf("Error occurred while processing %v", obj) + klog.Errorf("Error occurred while processing %s", obj) utilruntime.HandleError(err) } @@ -221,16 +258,28 @@ func (tc *Controller) tagNodesResources(node *v1.Node) error { // tagEc2Instances applies the provided tags to each EC2 instance in // the cluster. func (tc *Controller) tagEc2Instance(node *v1.Node) error { + if !tc.isTaggingRequired(node) { + klog.Infof("Skip tagging node %s since it was already tagged earlier.", node.GetName()) + return nil + } + instanceID, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID() err := tc.cloud.TagResource(string(instanceID), tc.tags) if err != nil { - klog.Errorf("Error in tagging EC2 instance for node %s, error: %v", node.GetName(), err) + klog.Errorf("Error in tagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err) return err } - klog.Infof("Successfully tagged %s with %v", instanceID, tc.tags) + labels := map[string]string{taggingControllerLabelKey: tc.getChecksumOfTags()} + klog.Infof("Successfully tagged %s with %v. Labeling the nodes with tagging controller labels now.", instanceID, tc.tags) + if !nodehelpers.AddOrUpdateLabelsOnNode(tc.kubeClient, labels, node) { + klog.Errorf("Couldn't apply labels %s to node %s.", labels, node.GetName()) + return fmt.Errorf("couldn't apply labels %s to node %s", labels, node.GetName()) + } + + klog.Infof("Successfully labeled node %s with %v.", node.GetName(), labels) return nil } @@ -259,7 +308,7 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error { err := tc.cloud.UntagResource(string(instanceID), tc.tags) if err != nil { - klog.Errorf("Error in untagging EC2 instance for node %s, error: %v", node.GetName(), err) + klog.Errorf("Error in untagging EC2 instance %s for node %s, error: %v", instanceID, node.GetName(), err) return err } @@ -270,8 +319,7 @@ func (tc *Controller) untagEc2Instance(node *v1.Node) error { // enqueueNode takes in the object and an // action for the object for a workitem and enqueue to the workqueue -func (tc *Controller) enqueueNode(obj interface{}, action func(node *v1.Node) error) { - node := obj.(*v1.Node) +func (tc *Controller) enqueueNode(node *v1.Node, action func(node *v1.Node) error) { item := &workItem{ node: node, action: action, @@ -281,3 +329,24 @@ func (tc *Controller) enqueueNode(obj interface{}, action func(node *v1.Node) er tc.workqueue.Add(item) klog.Infof("Added %s to the workqueue", item) } + +func (tc *Controller) isTaggingRequired(node *v1.Node) bool { + if node.Labels == nil { + return true + } + + if labelValue, ok := node.Labels[taggingControllerLabelKey]; !ok || labelValue != tc.getChecksumOfTags() { + return true + } + + return false +} + +func (tc *Controller) getChecksumOfTags() string { + tags := []string{} + for key, value := range tc.tags { + tags = append(tags, key+"="+value) + } + sort.Strings(tags) + return fmt.Sprintf("%x", md5.Sum([]byte(strings.Join(tags, ",")))) +} diff --git a/pkg/controllers/tagging/tagging_controller_test.go b/pkg/controllers/tagging/tagging_controller_test.go index abee028e46..29ff076635 100644 --- a/pkg/controllers/tagging/tagging_controller_test.go +++ b/pkg/controllers/tagging/tagging_controller_test.go @@ -69,6 +69,54 @@ func Test_NodesJoiningAndLeaving(t *testing.T) { toBeTagged: true, expectedMessages: []string{"Successfully tagged i-0001"}, }, + { + name: "node0 joins the cluster and was tagged earlier with different tags.", + currNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + taggingControllerLabelKey: "9767c4972ba72e87ab553bad2afde741", // MD5 for key1=value1 + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "i-0001", + }, + }, + toBeTagged: true, + expectedMessages: []string{"Successfully tagged i-0001"}, + }, + { + name: "node0 joins the cluster but isn't tagged because it was already tagged earlier.", + currNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Labels: map[string]string{ + taggingControllerLabelKey: "c812faa65d1d5e5aefa6b069b3da39df", // MD5 for key1=value1,key2=value2 + }, + }, + Spec: v1.NodeSpec{ + ProviderID: "i-0001", + }, + }, + toBeTagged: true, + expectedMessages: []string{"Skip tagging node node0 since it was already tagged earlier."}, + }, + { + name: "fargate node joins the cluster.", + currNode: &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fargatenode0", + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Spec: v1.NodeSpec{ + ProviderID: "aws:///us-west-2a/2ea696a557-9e55466d21eb4f83a99a9aa396bbd134/fargate-ip-10-0-55-27.us-west-2.compute.internal", + }, + }, + toBeTagged: true, + expectedMessages: []string{"Skip processing the node fargate-ip-10-0-55-27.us-west-2.compute.internal since it is a Fargate node"}, + }, { name: "node0 leaves the cluster, failed to untag.", currNode: &v1.Node{ @@ -81,7 +129,7 @@ func Test_NodesJoiningAndLeaving(t *testing.T) { }, }, toBeTagged: false, - expectedMessages: []string{"Error in untagging EC2 instance for node node0"}, + expectedMessages: []string{"Error in untagging EC2 instance i-error for node node0"}, }, { name: "node0 leaves the cluster.", @@ -124,7 +172,7 @@ func Test_NodesJoiningAndLeaving(t *testing.T) { kubeClient: clientset, cloud: fakeAws, nodeMonitorPeriod: 1 * time.Second, - tags: map[string]string{"key": "value"}, + tags: map[string]string{"key2": "value2", "key1": "value1"}, resources: []string{"instance"}, workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tagging"), } diff --git a/pkg/providers/v1/aws.go b/pkg/providers/v1/aws.go index b8fb43a81e..7c57aa4f9a 100644 --- a/pkg/providers/v1/aws.go +++ b/pkg/providers/v1/aws.go @@ -1657,7 +1657,7 @@ func (c *Cloud) NodeAddressesByProviderID(ctx context.Context, providerID string return nil, err } - if isFargateNode(string(instanceID)) { + if IsFargateNode(string(instanceID)) { eni, err := c.describeNetworkInterfaces(string(instanceID)) if eni == nil || err != nil { return nil, err @@ -1681,7 +1681,7 @@ func (c *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID strin return false, err } - if isFargateNode(string(instanceID)) { + if IsFargateNode(string(instanceID)) { eni, err := c.describeNetworkInterfaces(string(instanceID)) return eni != nil, err } @@ -1721,7 +1721,7 @@ func (c *Cloud) InstanceShutdownByProviderID(ctx context.Context, providerID str return false, err } - if isFargateNode(string(instanceID)) { + if IsFargateNode(string(instanceID)) { eni, err := c.describeNetworkInterfaces(string(instanceID)) return eni != nil, err } @@ -1782,7 +1782,7 @@ func (c *Cloud) InstanceTypeByProviderID(ctx context.Context, providerID string) return "", err } - if isFargateNode(string(instanceID)) { + if IsFargateNode(string(instanceID)) { return "", nil } @@ -1891,7 +1891,7 @@ func (c *Cloud) GetZoneByProviderID(ctx context.Context, providerID string) (clo return cloudprovider.Zone{}, err } - if isFargateNode(string(instanceID)) { + if IsFargateNode(string(instanceID)) { eni, err := c.describeNetworkInterfaces(string(instanceID)) if eni == nil || err != nil { return cloudprovider.Zone{}, err @@ -4938,8 +4938,8 @@ func (c *Cloud) getFullInstance(nodeName types.NodeName) (*awsInstance, *ec2.Ins return awsInstance, instance, err } -// isFargateNode returns true if given node runs on Fargate compute -func isFargateNode(nodeName string) bool { +// IsFargateNode returns true if given node runs on Fargate compute +func IsFargateNode(nodeName string) bool { return strings.HasPrefix(nodeName, fargateNodeNamePrefix) } diff --git a/pkg/providers/v1/instances.go b/pkg/providers/v1/instances.go index 4a055ff5e8..dcee69353b 100644 --- a/pkg/providers/v1/instances.go +++ b/pkg/providers/v1/instances.go @@ -82,7 +82,7 @@ func (name KubernetesInstanceID) MapToAWSInstanceID() (InstanceID, error) { // We sanity check the resulting volume; the two known formats are // i-12345678 and i-12345678abcdef01 - if awsID == "" || !(awsInstanceRegMatch.MatchString(awsID) || isFargateNode(awsID)) { + if awsID == "" || !(awsInstanceRegMatch.MatchString(awsID) || IsFargateNode(awsID)) { return "", fmt.Errorf("Invalid format for AWS instance (%s)", name) }