Skip to content
This repository has been archived by the owner on Jul 22, 2024. It is now read-only.

Commit

Permalink
Fix CI
Browse files Browse the repository at this point in the history
  • Loading branch information
Sharathmk99 committed Sep 28, 2023
1 parent 3c197f3 commit 1c13c8a
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 15 deletions.
1 change: 1 addition & 0 deletions cmd/node-labels-resources/doc.go
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
// Package main contains the main function for the node-label-resource-plugin.
package main
2 changes: 1 addition & 1 deletion cmd/node-labels-resources/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func main() {
},
}

rootCmd.Flags().Var(&nodeLabels, "node-label", "set a node label having format name=value. e.g.: --node-label=dedicated=liqo --node-label=dedicated=volcano.")
rootCmd.Flags().Var(&nodeLabels, "node-label", "set a node label having format name=value. e.g.: --node-label=label1=v1 --node-label=label2=v2.")
rootCmd.PersistentFlags().IntVar(&port, "port", 6001, "set port where the server will listen on.")
rootCmd.Flags().AddGoFlagSet(fs)

Expand Down
1 change: 1 addition & 0 deletions pkg/node-labels-resources/doc.go
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
// Package nodeselectorresources contains the logic for the node-label-resource-plugin.
package nodeselectorresources
37 changes: 25 additions & 12 deletions pkg/node-labels-resources/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ import (
"github.com/liqotech/liqo/pkg/utils"
)

// NodeDetails stores details of the Node.
type NodeDetails struct {
Schedulable bool
Allocatable corev1.ResourceList
Pods map[string]corev1.ResourceList
}

type NodeLabelsMonitor struct {
type nodeLabelsMonitor struct {
Server *grpc.Server
resourcemonitors.ResourceReaderServer
subscribers sync.Map
Expand All @@ -49,6 +50,7 @@ type NodeLabelsMonitor struct {
resourceLists map[string]NodeDetails
}

// ListenAndServeGRPCServer creates the gRPC server and makes it listen on the given port.
func ListenAndServeGRPCServer(port int, nodeLabels map[string]string, clientset *kubernetes.Clientset) error {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
resyncPeriod := 10 * time.Hour
Expand Down Expand Up @@ -88,7 +90,7 @@ func ListenAndServeGRPCServer(port int, nodeLabels map[string]string, clientset
clientset, resyncPeriod, informers.WithTweakListOptions(noShadowPodsFilter),
)
podInformer := podFactory.Core().V1().Pods().Informer()
s := NodeLabelsMonitor{
s := nodeLabelsMonitor{
Server: grpc.NewServer(),
nodeLabels: nodeLabels,
allocatable: corev1.ResourceList{},
Expand Down Expand Up @@ -121,7 +123,7 @@ func ListenAndServeGRPCServer(port int, nodeLabels map[string]string, clientset
}

// react to a Node Creation/First informer run.
func (nlm *NodeLabelsMonitor) onNodeAdd(obj interface{}) {
func (nlm *nodeLabelsMonitor) onNodeAdd(obj interface{}) {
node := obj.(*corev1.Node)
toAdd := &node.Status.Allocatable
klog.V(4).Infof("Adding Node %s", node.Name)
Expand All @@ -136,13 +138,14 @@ func (nlm *NodeLabelsMonitor) onNodeAdd(obj interface{}) {
return
}
for _, pod := range pods.Items {

Check failure on line 140 in pkg/node-labels-resources/grpc_server.go

View workflow job for this annotation

GitHub Actions / Lint golang files

rangeValCopy: each iteration copies 1064 bytes (consider pointers or indexing) (gocritic)
pod := pod
nlm.onPodAdd(&pod)
}
nlm.writeClusterResources()
}

// react to a Node Update.
func (nlm *NodeLabelsMonitor) onNodeUpdate(oldObj, newObj interface{}) {
func (nlm *nodeLabelsMonitor) onNodeUpdate(oldObj, newObj interface{}) {
oldNode := oldObj.(*corev1.Node)
newNode := newObj.(*corev1.Node)
newNodeResources := newNode.Status.Allocatable
Expand Down Expand Up @@ -171,7 +174,7 @@ func (nlm *NodeLabelsMonitor) onNodeUpdate(oldObj, newObj interface{}) {
}

// react to a Node Delete.
func (nlm *NodeLabelsMonitor) onNodeDelete(obj interface{}) {
func (nlm *nodeLabelsMonitor) onNodeDelete(obj interface{}) {
node := obj.(*corev1.Node)
toDelete := &node.Status.Allocatable
nodeResourceList, ok := nlm.resourceLists[node.Name]
Expand All @@ -185,7 +188,7 @@ func (nlm *NodeLabelsMonitor) onNodeDelete(obj interface{}) {
nlm.writeClusterResources()
}

func (nlm *NodeLabelsMonitor) onPodAdd(obj interface{}) {
func (nlm *nodeLabelsMonitor) onPodAdd(obj interface{}) {
// Thanks to the filters at the informer level, add events are received only when pods running on physical nodes turn running.
podAdded, ok := obj.(*corev1.Pod)
if !ok {
Expand All @@ -207,7 +210,7 @@ func (nlm *NodeLabelsMonitor) onPodAdd(obj interface{}) {
nlm.writeClusterResources()
}

func (nlm *NodeLabelsMonitor) onPodDelete(obj interface{}) {
func (nlm *nodeLabelsMonitor) onPodDelete(obj interface{}) {
// Thanks to the filters at the informer level, delete events are received only when
// pods previously running on a physical node are no longer running.
podDeleted, ok := obj.(*corev1.Pod)
Expand All @@ -226,7 +229,7 @@ func (nlm *NodeLabelsMonitor) onPodDelete(obj interface{}) {
nlm.writeClusterResources()
}

func (nlm *NodeLabelsMonitor) writeClusterResources() {
func (nlm *nodeLabelsMonitor) writeClusterResources() {
podResourceUsage := corev1.ResourceList{}
nodeAllocatable := corev1.ResourceList{}
for _, nodeDetail := range nlm.resourceLists {
Expand Down Expand Up @@ -254,7 +257,10 @@ func extractPodResources(podToExtract *corev1.Pod) corev1.ResourceList {
return resourcesToExtract
}

func (nlm *NodeLabelsMonitor) ReadResources(ctx context.Context, req *resourcemonitors.ClusterIdentity) (*resourcemonitors.PoolResourceList, error) {
// ReadResources receives a clusterID and returns the resources for that specific clusterID. In this version of the resource plugin
// the clusterID is ignored and the same resources are returned for every clusterID received. Since this method could be called multiple
// times it has to be idempotent.
func (nlm *nodeLabelsMonitor) ReadResources(ctx context.Context, req *resourcemonitors.ClusterIdentity) (*resourcemonitors.PoolResourceList, error) {
klog.V(4).Infof("info: reading resources for cluster %s", req.ClusterID)
nlm.nodeMutex.RLock()
defer nlm.nodeMutex.RUnlock()
Expand All @@ -269,7 +275,8 @@ func (nlm *NodeLabelsMonitor) ReadResources(ctx context.Context, req *resourcemo
return &resourcemonitors.PoolResourceList{ResourceLists: resourceList}, nil
}

func (nlm *NodeLabelsMonitor) Subscribe(req *resourcemonitors.Empty, srv resourcemonitors.ResourceReader_SubscribeServer) error {
// Subscribe is quite standard in this implementation so the only thing that it does is to notify liqo immediately.
func (nlm *nodeLabelsMonitor) Subscribe(req *resourcemonitors.Empty, srv resourcemonitors.ResourceReader_SubscribeServer) error {
klog.V(1).Infof("info: liqo controller manager subscribed")

// Store the stream. Using req as key since each request will have a different req object.
Expand All @@ -289,7 +296,10 @@ func (nlm *NodeLabelsMonitor) Subscribe(req *resourcemonitors.Empty, srv resourc
return nil
}

func (nlm *NodeLabelsMonitor) NotifyChange(ctx context.Context, req *resourcemonitors.ClusterIdentity) error {
// NotifyChange uses the cached streams to notify liqo that some resources changed. This method receives a clusterID inside req
// which can be a real clusterID or resourcemonitors.AllClusterIDs which tells to liqo to refresh all the resources
// of all the peered clusters.
func (nlm *nodeLabelsMonitor) NotifyChange(ctx context.Context, req *resourcemonitors.ClusterIdentity) error {
klog.V(1).Infof("info: sending notification to liqo controller manager for cluster %q", req.ClusterID)
var err error

Expand All @@ -311,7 +321,10 @@ func (nlm *NodeLabelsMonitor) NotifyChange(ctx context.Context, req *resourcemon
return err
}

func (nlm *NodeLabelsMonitor) RemoveCluster(ctx context.Context, req *resourcemonitors.ClusterIdentity) (*resourcemonitors.Empty, error) {
// RemoveCluster is useful to clean cluster's information if it exists when a cluster is upeered. This method receives
// a clusterID which identifies the cluster that has been removed. We believe that this method is useful in custom
// implementation, for example where a database is involved in the implementation.
func (nlm *nodeLabelsMonitor) RemoveCluster(ctx context.Context, req *resourcemonitors.ClusterIdentity) (*resourcemonitors.Empty, error) {
klog.V(1).Infof("info: removing cluster %s", req.ClusterID)
return &resourcemonitors.Empty{}, nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/utils/args/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ func parseQuantity(str string) (string, *resource.Quantity, error) {
return res[0], &quantity, nil
}

// NOdeLabelMap contains labels

Check failure on line 82 in pkg/utils/args/resources.go

View workflow job for this annotation

GitHub Actions / Lint golang files

Comment should end in a period (godot)
type NodeLabelsMap struct {
StringValues liqoargs.StringList
NodeLabels map[string]string
}

// Set function sets the label

Check failure on line 88 in pkg/utils/args/resources.go

View workflow job for this annotation

GitHub Actions / Lint golang files

Comment should end in a period (godot)
func (n *NodeLabelsMap) Set(str string) error {
if n.NodeLabels == nil {
n.NodeLabels = make(map[string]string)
Expand Down
1 change: 1 addition & 0 deletions pkg/utils/clients/doc.go
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
// Package client contains the kubernetes realted function.
package client
5 changes: 3 additions & 2 deletions pkg/utils/clients/kubernetes_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/client-go/util/homedir"
)

// CreateKubernetesCore create Kuberentes client

Check failure on line 14 in pkg/utils/clients/kubernetes_core.go

View workflow job for this annotation

GitHub Actions / Lint golang files

Comment should end in a period (godot)
func CreateKubernetesCore() (*kubernetes.Clientset, error) {
restConfig, err := rest.InClusterConfig()
if err != nil {
Expand All @@ -21,13 +22,13 @@ func CreateKubernetesCore() (*kubernetes.Clientset, error) {
restConfig, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
}
if err != nil {
return nil, errors.Wrap(err, "Failed to initialize the RestConfig")
return nil, errors.Wrap(err, "failed to initialize the RestConfig")
}
restConfig.QPS = float32(50)
restConfig.Burst = 50
clientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, errors.Wrap(err, "Failed to initialize kubernetes client set.")
return nil, errors.Wrap(err, "failed to initialize kubernetes client set")
}
return clientSet, nil
}

0 comments on commit 1c13c8a

Please sign in to comment.