From e23edb01995e8108d96aad6c021217e2d0f8ae66 Mon Sep 17 00:00:00 2001 From: Rohan CJ Date: Fri, 10 Nov 2023 17:56:43 +0530 Subject: [PATCH] fix: watch and enqueue node events when scheduler enabled Signed-off-by: Rohan CJ --- pkg/constants/indices.go | 5 ++ pkg/controllers/resources/nodes/syncer.go | 89 ++++++++++++++++---- pkg/controllers/resources/nodes/translate.go | 2 +- 3 files changed, 80 insertions(+), 16 deletions(-) diff --git a/pkg/constants/indices.go b/pkg/constants/indices.go index 72ea53175..f247d2cd5 100644 --- a/pkg/constants/indices.go +++ b/pkg/constants/indices.go @@ -14,6 +14,11 @@ const ( IndexByHostName = "IndexByHostName" IndexByClusterIP = "IndexByClusterIP" + + // IndexRunningNonVclusterPodsByNode is only used when the vcluster scheduler is enabled. + // It maps non-vcluster pods on the node to the node name, so that the node syncer may + // calculate the allocatable resources on the node. + IndexRunningNonVclusterPodsByNode = "IndexRunningNonVclusterPodsByNode" ) const DefaultCacheSyncTimeout = time.Minute * 15 diff --git a/pkg/controllers/resources/nodes/syncer.go b/pkg/controllers/resources/nodes/syncer.go index 57ec91a13..b9da894d7 100644 --- a/pkg/controllers/resources/nodes/syncer.go +++ b/pkg/controllers/resources/nodes/syncer.go @@ -2,9 +2,12 @@ package nodes import ( "context" + "fmt" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/loft-sh/vcluster/pkg/constants" "github.com/loft-sh/vcluster/pkg/controllers/resources/nodes/nodeservice" @@ -16,7 +19,9 @@ import ( "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -26,10 +31,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" ) -var ( - indexPodByRunningNonVClusterNode = "indexpodbyrunningnonvclusternode" -) - func NewSyncer(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice.Provider) (syncertypes.Object, error) { var err error var nodeSelector labels.Selector @@ -82,7 +83,7 @@ type nodeSyncer struct { physicalClient client.Client virtualClient client.Client - podCache client.Reader + unmamangedPodCache client.Reader nodeServiceProvider nodeservice.Provider enforcedTolerations []*corev1.Toleration } @@ -97,25 +98,31 @@ func (s *nodeSyncer) Name() string { var _ syncertypes.ControllerModifier = &nodeSyncer{} -func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, builder *builder.Builder) (*builder.Builder, error) { +func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, bld *builder.Builder) (*builder.Builder, error) { if s.enableScheduler { - // create a global pod cache for calculating the correct node resources + notManagedSelector, err := labels.NewRequirement(translate.MarkerLabel, selection.NotEquals, []string{translate.Suffix}) + if err != nil { + return bld, fmt.Errorf("constructing label selector for non-vcluster pods: %w", err) + } + // create a pod cache containing pods from all namespaces for calculating the correct node resources podCache, err := cache.New(ctx.PhysicalManager.GetConfig(), cache.Options{ Scheme: ctx.PhysicalManager.GetScheme(), Mapper: ctx.PhysicalManager.GetRESTMapper(), + // omits pods managed by the vcluster + DefaultLabelSelector: labels.NewSelector().Add(*notManagedSelector), + // omits pods that are pending as they do not consume resources + DefaultFieldSelector: fields.OneTermNotEqualSelector("status.phase", string(corev1.PodPending)), }) if err != nil { return nil, errors.Wrap(err, "create cache") } // add index for pod by node - err = podCache.IndexField(ctx.Context, &corev1.Pod{}, indexPodByRunningNonVClusterNode, func(object client.Object) []string { + err = podCache.IndexField(ctx.Context, &corev1.Pod{}, constants.IndexRunningNonVclusterPodsByNode, func(object client.Object) []string { pPod := object.(*corev1.Pod) // we ignore all non-running pods and the ones that are part of the current vcluster // to later calculate the status.allocatable part of the nodes correctly if pPod.Status.Phase == corev1.PodSucceeded || pPod.Status.Phase == corev1.PodFailed { return []string{} - } else if translate.Default.IsManaged(pPod) { - return []string{} } else if pPod.Spec.NodeName == "" { return []string{} } @@ -132,17 +139,65 @@ func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, builder } }() podCache.WaitForCacheSync(ctx.Context) - s.podCache = podCache + s.unmamangedPodCache = podCache + + // enqueues nodes based on pod phase changes if the scheduler is enabled + // the scheduler configure + bld.WatchesRawSource( + source.Kind(podCache, &corev1.Pod{}), + handler.EnqueueRequestsFromMapFunc(func(_ context.Context, object client.Object) []reconcile.Request { + pPod := object.(*corev1.Pod) + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Name: pPod.Spec.NodeName, + }, + }, + } + }), + builder.WithPredicates( + predicate.NewPredicateFuncs(func(object client.Object) bool { + pPod, ok := object.(*corev1.Pod) + if !ok { + klog.Info("object failed to resolve into pod") + return false + } + if pPod.Status.Phase == corev1.PodPending { + klog.Info("skiping pod") + return false + } + // skip pending pods and unassigned pods + return pPod.Status.Phase != corev1.PodPending && pPod.Spec.NodeName != "" + }), + predicate.Funcs{ + UpdateFunc: func(ue event.UpdateEvent) bool { + oldPod, ok := ue.ObjectOld.(*corev1.Pod) + if !ok { + klog.Info("object failed to resolve into pod") + return false + } + newPod, ok := ue.ObjectNew.(*corev1.Pod) + if !ok { + klog.Info("object failed to resolve into pod") + return false + } + // only if phase changed, and new phase is not pending (resources are immmutable for a pod) + return oldPod.Status.Phase != newPod.Status.Phase + }, + }, + ), + ) } - return modifyController(ctx, s.nodeServiceProvider, builder) + return modifyController(ctx, s.nodeServiceProvider, bld) } -func modifyController(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice.Provider, builder *builder.Builder) (*builder.Builder, error) { +// this is split out because it is shared with the fake syncer +func modifyController(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice.Provider, bld *builder.Builder) (*builder.Builder, error) { go func() { nodeServiceProvider.Start(ctx.Context) }() - return builder.WatchesRawSource(source.Kind(ctx.PhysicalManager.GetCache(), &corev1.Pod{}), handler.EnqueueRequestsFromMapFunc(func(_ context.Context, object client.Object) []reconcile.Request { + bld = bld.WatchesRawSource(source.Kind(ctx.PhysicalManager.GetCache(), &corev1.Pod{}), handler.EnqueueRequestsFromMapFunc(func(_ context.Context, object client.Object) []reconcile.Request { pod, ok := object.(*corev1.Pod) if !ok || pod == nil || !translate.Default.IsManaged(pod) || pod.Spec.NodeName == "" { return []reconcile.Request{} @@ -168,7 +223,11 @@ func modifyController(ctx *synccontext.RegisterContext, nodeServiceProvider node }, }, } - })), nil + })) + + // if s.EnableScheduler + + return bld, nil } var _ syncertypes.IndicesRegisterer = &nodeSyncer{} diff --git a/pkg/controllers/resources/nodes/translate.go b/pkg/controllers/resources/nodes/translate.go index 87860cd40..78b1af70c 100644 --- a/pkg/controllers/resources/nodes/translate.go +++ b/pkg/controllers/resources/nodes/translate.go @@ -181,7 +181,7 @@ func (s *nodeSyncer) translateUpdateStatus(ctx *synccontext.SyncContext, pNode * var nonVClusterPods int64 podList := &corev1.PodList{} - err := s.podCache.List(ctx.Context, podList, client.MatchingFields{indexPodByRunningNonVClusterNode: pNode.Name}) + err := s.unmamangedPodCache.List(ctx.Context, podList, client.MatchingFields{constants.IndexRunningNonVclusterPodsByNode: pNode.Name}) if err != nil { klog.Errorf("Error listing pods: %v", err) } else {