From 1c13c8af400354a51fa365a8a954c89d1f4dd168 Mon Sep 17 00:00:00 2001 From: Sharath MK Date: Thu, 28 Sep 2023 15:28:29 +0000 Subject: [PATCH] Fix CI --- cmd/node-labels-resources/doc.go | 1 + cmd/node-labels-resources/main.go | 2 +- pkg/node-labels-resources/doc.go | 1 + pkg/node-labels-resources/grpc_server.go | 37 ++++++++++++++++-------- pkg/utils/args/resources.go | 2 ++ pkg/utils/clients/doc.go | 1 + pkg/utils/clients/kubernetes_core.go | 5 ++-- 7 files changed, 34 insertions(+), 15 deletions(-) diff --git a/cmd/node-labels-resources/doc.go b/cmd/node-labels-resources/doc.go index 06ab7d0..9c5552c 100644 --- a/cmd/node-labels-resources/doc.go +++ b/cmd/node-labels-resources/doc.go @@ -1 +1,2 @@ +// Package main contains the main function for the node-label-resource-plugin. package main diff --git a/cmd/node-labels-resources/main.go b/cmd/node-labels-resources/main.go index 9398f06..cca8f6c 100644 --- a/cmd/node-labels-resources/main.go +++ b/cmd/node-labels-resources/main.go @@ -32,7 +32,7 @@ func main() { }, } - rootCmd.Flags().Var(&nodeLabels, "node-label", "set a node label having format name=value. e.g.: --node-label=dedicated=liqo --node-label=dedicated=volcano.") + rootCmd.Flags().Var(&nodeLabels, "node-label", "set a node label having format name=value. e.g.: --node-label=label1=v1 --node-label=label2=v2.") rootCmd.PersistentFlags().IntVar(&port, "port", 6001, "set port where the server will listen on.") rootCmd.Flags().AddGoFlagSet(fs) diff --git a/pkg/node-labels-resources/doc.go b/pkg/node-labels-resources/doc.go index 5b1c926..e4a9b5f 100644 --- a/pkg/node-labels-resources/doc.go +++ b/pkg/node-labels-resources/doc.go @@ -1 +1,2 @@ +// Package nodeselectorresources contains the logic for the node-label-resource-plugin. package nodeselectorresources diff --git a/pkg/node-labels-resources/grpc_server.go b/pkg/node-labels-resources/grpc_server.go index ffb3e23..73cd40c 100644 --- a/pkg/node-labels-resources/grpc_server.go +++ b/pkg/node-labels-resources/grpc_server.go @@ -30,13 +30,14 @@ import ( "github.com/liqotech/liqo/pkg/utils" ) +// NodeDetails stores details of the Node. type NodeDetails struct { Schedulable bool Allocatable corev1.ResourceList Pods map[string]corev1.ResourceList } -type NodeLabelsMonitor struct { +type nodeLabelsMonitor struct { Server *grpc.Server resourcemonitors.ResourceReaderServer subscribers sync.Map @@ -49,6 +50,7 @@ type NodeLabelsMonitor struct { resourceLists map[string]NodeDetails } +// ListenAndServeGRPCServer creates the gRPC server and makes it listen on the given port. func ListenAndServeGRPCServer(port int, nodeLabels map[string]string, clientset *kubernetes.Clientset) error { lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) resyncPeriod := 10 * time.Hour @@ -88,7 +90,7 @@ func ListenAndServeGRPCServer(port int, nodeLabels map[string]string, clientset clientset, resyncPeriod, informers.WithTweakListOptions(noShadowPodsFilter), ) podInformer := podFactory.Core().V1().Pods().Informer() - s := NodeLabelsMonitor{ + s := nodeLabelsMonitor{ Server: grpc.NewServer(), nodeLabels: nodeLabels, allocatable: corev1.ResourceList{}, @@ -121,7 +123,7 @@ func ListenAndServeGRPCServer(port int, nodeLabels map[string]string, clientset } // react to a Node Creation/First informer run. -func (nlm *NodeLabelsMonitor) onNodeAdd(obj interface{}) { +func (nlm *nodeLabelsMonitor) onNodeAdd(obj interface{}) { node := obj.(*corev1.Node) toAdd := &node.Status.Allocatable klog.V(4).Infof("Adding Node %s", node.Name) @@ -136,13 +138,14 @@ func (nlm *NodeLabelsMonitor) onNodeAdd(obj interface{}) { return } for _, pod := range pods.Items { + pod := pod nlm.onPodAdd(&pod) } nlm.writeClusterResources() } // react to a Node Update. -func (nlm *NodeLabelsMonitor) onNodeUpdate(oldObj, newObj interface{}) { +func (nlm *nodeLabelsMonitor) onNodeUpdate(oldObj, newObj interface{}) { oldNode := oldObj.(*corev1.Node) newNode := newObj.(*corev1.Node) newNodeResources := newNode.Status.Allocatable @@ -171,7 +174,7 @@ func (nlm *NodeLabelsMonitor) onNodeUpdate(oldObj, newObj interface{}) { } // react to a Node Delete. -func (nlm *NodeLabelsMonitor) onNodeDelete(obj interface{}) { +func (nlm *nodeLabelsMonitor) onNodeDelete(obj interface{}) { node := obj.(*corev1.Node) toDelete := &node.Status.Allocatable nodeResourceList, ok := nlm.resourceLists[node.Name] @@ -185,7 +188,7 @@ func (nlm *NodeLabelsMonitor) onNodeDelete(obj interface{}) { nlm.writeClusterResources() } -func (nlm *NodeLabelsMonitor) onPodAdd(obj interface{}) { +func (nlm *nodeLabelsMonitor) onPodAdd(obj interface{}) { // Thanks to the filters at the informer level, add events are received only when pods running on physical nodes turn running. podAdded, ok := obj.(*corev1.Pod) if !ok { @@ -207,7 +210,7 @@ func (nlm *NodeLabelsMonitor) onPodAdd(obj interface{}) { nlm.writeClusterResources() } -func (nlm *NodeLabelsMonitor) onPodDelete(obj interface{}) { +func (nlm *nodeLabelsMonitor) onPodDelete(obj interface{}) { // Thanks to the filters at the informer level, delete events are received only when // pods previously running on a physical node are no longer running. podDeleted, ok := obj.(*corev1.Pod) @@ -226,7 +229,7 @@ func (nlm *NodeLabelsMonitor) onPodDelete(obj interface{}) { nlm.writeClusterResources() } -func (nlm *NodeLabelsMonitor) writeClusterResources() { +func (nlm *nodeLabelsMonitor) writeClusterResources() { podResourceUsage := corev1.ResourceList{} nodeAllocatable := corev1.ResourceList{} for _, nodeDetail := range nlm.resourceLists { @@ -254,7 +257,10 @@ func extractPodResources(podToExtract *corev1.Pod) corev1.ResourceList { return resourcesToExtract } -func (nlm *NodeLabelsMonitor) ReadResources(ctx context.Context, req *resourcemonitors.ClusterIdentity) (*resourcemonitors.PoolResourceList, error) { +// ReadResources receives a clusterID and returns the resources for that specific clusterID. In this version of the resource plugin +// the clusterID is ignored and the same resources are returned for every clusterID received. Since this method could be called multiple +// times it has to be idempotent. +func (nlm *nodeLabelsMonitor) ReadResources(ctx context.Context, req *resourcemonitors.ClusterIdentity) (*resourcemonitors.PoolResourceList, error) { klog.V(4).Infof("info: reading resources for cluster %s", req.ClusterID) nlm.nodeMutex.RLock() defer nlm.nodeMutex.RUnlock() @@ -269,7 +275,8 @@ func (nlm *NodeLabelsMonitor) ReadResources(ctx context.Context, req *resourcemo return &resourcemonitors.PoolResourceList{ResourceLists: resourceList}, nil } -func (nlm *NodeLabelsMonitor) Subscribe(req *resourcemonitors.Empty, srv resourcemonitors.ResourceReader_SubscribeServer) error { +// Subscribe is quite standard in this implementation so the only thing that it does is to notify liqo immediately. +func (nlm *nodeLabelsMonitor) Subscribe(req *resourcemonitors.Empty, srv resourcemonitors.ResourceReader_SubscribeServer) error { klog.V(1).Infof("info: liqo controller manager subscribed") // Store the stream. Using req as key since each request will have a different req object. @@ -289,7 +296,10 @@ func (nlm *NodeLabelsMonitor) Subscribe(req *resourcemonitors.Empty, srv resourc return nil } -func (nlm *NodeLabelsMonitor) NotifyChange(ctx context.Context, req *resourcemonitors.ClusterIdentity) error { +// NotifyChange uses the cached streams to notify liqo that some resources changed. This method receives a clusterID inside req +// which can be a real clusterID or resourcemonitors.AllClusterIDs which tells to liqo to refresh all the resources +// of all the peered clusters. +func (nlm *nodeLabelsMonitor) NotifyChange(ctx context.Context, req *resourcemonitors.ClusterIdentity) error { klog.V(1).Infof("info: sending notification to liqo controller manager for cluster %q", req.ClusterID) var err error @@ -311,7 +321,10 @@ func (nlm *NodeLabelsMonitor) NotifyChange(ctx context.Context, req *resourcemon return err } -func (nlm *NodeLabelsMonitor) RemoveCluster(ctx context.Context, req *resourcemonitors.ClusterIdentity) (*resourcemonitors.Empty, error) { +// RemoveCluster is useful to clean cluster's information if it exists when a cluster is upeered. This method receives +// a clusterID which identifies the cluster that has been removed. We believe that this method is useful in custom +// implementation, for example where a database is involved in the implementation. +func (nlm *nodeLabelsMonitor) RemoveCluster(ctx context.Context, req *resourcemonitors.ClusterIdentity) (*resourcemonitors.Empty, error) { klog.V(1).Infof("info: removing cluster %s", req.ClusterID) return &resourcemonitors.Empty{}, nil } diff --git a/pkg/utils/args/resources.go b/pkg/utils/args/resources.go index ca503c2..373b241 100644 --- a/pkg/utils/args/resources.go +++ b/pkg/utils/args/resources.go @@ -79,11 +79,13 @@ func parseQuantity(str string) (string, *resource.Quantity, error) { return res[0], &quantity, nil } +// NOdeLabelMap contains labels type NodeLabelsMap struct { StringValues liqoargs.StringList NodeLabels map[string]string } +// Set function sets the label func (n *NodeLabelsMap) Set(str string) error { if n.NodeLabels == nil { n.NodeLabels = make(map[string]string) diff --git a/pkg/utils/clients/doc.go b/pkg/utils/clients/doc.go index da13c8e..549f1b6 100644 --- a/pkg/utils/clients/doc.go +++ b/pkg/utils/clients/doc.go @@ -1 +1,2 @@ +// Package client contains the kubernetes realted function. package client diff --git a/pkg/utils/clients/kubernetes_core.go b/pkg/utils/clients/kubernetes_core.go index d5a47f9..f631d03 100644 --- a/pkg/utils/clients/kubernetes_core.go +++ b/pkg/utils/clients/kubernetes_core.go @@ -11,6 +11,7 @@ import ( "k8s.io/client-go/util/homedir" ) +// CreateKubernetesCore create Kuberentes client func CreateKubernetesCore() (*kubernetes.Clientset, error) { restConfig, err := rest.InClusterConfig() if err != nil { @@ -21,13 +22,13 @@ func CreateKubernetesCore() (*kubernetes.Clientset, error) { restConfig, err = clientcmd.BuildConfigFromFlags("", kubeconfig) } if err != nil { - return nil, errors.Wrap(err, "Failed to initialize the RestConfig") + return nil, errors.Wrap(err, "failed to initialize the RestConfig") } restConfig.QPS = float32(50) restConfig.Burst = 50 clientSet, err := kubernetes.NewForConfig(restConfig) if err != nil { - return nil, errors.Wrap(err, "Failed to initialize kubernetes client set.") + return nil, errors.Wrap(err, "failed to initialize kubernetes client set") } return clientSet, nil }