diff --git a/pkg/constants/indices.go b/pkg/constants/indices.go index 72ea53175..6ba9f39b9 100644 --- a/pkg/constants/indices.go +++ b/pkg/constants/indices.go @@ -14,6 +14,11 @@ const ( IndexByHostName = "IndexByHostName" IndexByClusterIP = "IndexByClusterIP" + + // IndexRunningNonVClusterPodsByNode is only used when the vcluster scheduler is enabled. + // It maps non-vcluster pods on the node to the node name, so that the node syncer may + // calculate the allocatable resources on the node. + IndexRunningNonVClusterPodsByNode = "IndexRunningNonVClusterPodsByNode" ) const DefaultCacheSyncTimeout = time.Minute * 15 diff --git a/pkg/controllers/resources/nodes/syncer.go b/pkg/controllers/resources/nodes/syncer.go index 57ec91a13..3bbcf0901 100644 --- a/pkg/controllers/resources/nodes/syncer.go +++ b/pkg/controllers/resources/nodes/syncer.go @@ -2,9 +2,12 @@ package nodes import ( "context" + "fmt" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/event" "github.com/loft-sh/vcluster/pkg/constants" "github.com/loft-sh/vcluster/pkg/controllers/resources/nodes/nodeservice" @@ -17,6 +20,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -26,10 +30,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/source" ) -var ( - indexPodByRunningNonVClusterNode = "indexpodbyrunningnonvclusternode" -) - func NewSyncer(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice.Provider) (syncertypes.Object, error) { var err error var nodeSelector labels.Selector @@ -82,7 +82,7 @@ type nodeSyncer struct { physicalClient client.Client virtualClient client.Client - podCache client.Reader + unmanagedPodCache client.Reader nodeServiceProvider nodeservice.Provider enforcedTolerations []*corev1.Toleration } @@ -97,25 +97,29 @@ func (s *nodeSyncer) Name() string { var _ syncertypes.ControllerModifier = &nodeSyncer{} -func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, builder *builder.Builder) (*builder.Builder, error) { +func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, bld *builder.Builder) (*builder.Builder, error) { if s.enableScheduler { - // create a global pod cache for calculating the correct node resources + notManagedSelector, err := labels.NewRequirement(translate.MarkerLabel, selection.NotEquals, []string{translate.Suffix}) + if err != nil { + return bld, fmt.Errorf("constructing label selector for non-vcluster pods: %w", err) + } + // create a pod cache containing pods from all namespaces for calculating the correct node resources podCache, err := cache.New(ctx.PhysicalManager.GetConfig(), cache.Options{ Scheme: ctx.PhysicalManager.GetScheme(), Mapper: ctx.PhysicalManager.GetRESTMapper(), + // omits pods managed by the vcluster + DefaultLabelSelector: labels.NewSelector().Add(*notManagedSelector), }) if err != nil { return nil, errors.Wrap(err, "create cache") } // add index for pod by node - err = podCache.IndexField(ctx.Context, &corev1.Pod{}, indexPodByRunningNonVClusterNode, func(object client.Object) []string { + err = podCache.IndexField(ctx.Context, &corev1.Pod{}, constants.IndexRunningNonVClusterPodsByNode, func(object client.Object) []string { pPod := object.(*corev1.Pod) // we ignore all non-running pods and the ones that are part of the current vcluster // to later calculate the status.allocatable part of the nodes correctly if pPod.Status.Phase == corev1.PodSucceeded || pPod.Status.Phase == corev1.PodFailed { return []string{} - } else if translate.Default.IsManaged(pPod) { - return []string{} } else if pPod.Spec.NodeName == "" { return []string{} } @@ -131,18 +135,65 @@ func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, builder klog.Fatalf("error starting pod cache: %v", err) } }() + podCache.WaitForCacheSync(ctx.Context) - s.podCache = podCache + s.unmanagedPodCache = podCache + + // enqueues nodes based on pod phase changes if the scheduler is enabled + // the syncer is configured to update virtual node's .status.allocatable fields by summing the consumption of these pods + bld.WatchesRawSource( + source.Kind(podCache, &corev1.Pod{}), + handler.Funcs{ + GenericFunc: func(ctx context.Context, ev event.GenericEvent, q workqueue.RateLimitingInterface) { + enqueueNonVclusterPod(nil, ev.Object, q) + }, + CreateFunc: func(ctx context.Context, ev event.CreateEvent, q workqueue.RateLimitingInterface) { + enqueueNonVclusterPod(nil, ev.Object, q) + }, + UpdateFunc: func(ctx context.Context, ue event.UpdateEvent, q workqueue.RateLimitingInterface) { + enqueueNonVclusterPod(ue.ObjectOld, ue.ObjectNew, q) + }, + DeleteFunc: func(ctx context.Context, ev event.DeleteEvent, q workqueue.RateLimitingInterface) { + enqueueNonVclusterPod(nil, ev.Object, q) + }, + }, + ) } - return modifyController(ctx, s.nodeServiceProvider, builder) + return modifyController(ctx, s.nodeServiceProvider, bld) } -func modifyController(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice.Provider, builder *builder.Builder) (*builder.Builder, error) { +// only used when scheduler is enabled +func enqueueNonVclusterPod(old, new client.Object, q workqueue.RateLimitingInterface) { + pod, ok := new.(*corev1.Pod) + if !ok { + klog.Errorf("invalid type passed to pod handler: %T", new) + return + } + // skip if node name missing + if pod.Spec.NodeName == "" { + return + } + if old != nil { + oldPod, ok := old.(*corev1.Pod) + if !ok { + klog.Errorf("invalid type passed to pod handler: %T", old) + return + } + // skip if running status not updated + if oldPod.Status.Phase == pod.Status.Phase { + return + } + } + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: pod.Spec.NodeName}}) +} + +// this is split out because it is shared with the fake syncer +func modifyController(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice.Provider, bld *builder.Builder) (*builder.Builder, error) { go func() { nodeServiceProvider.Start(ctx.Context) }() - return builder.WatchesRawSource(source.Kind(ctx.PhysicalManager.GetCache(), &corev1.Pod{}), handler.EnqueueRequestsFromMapFunc(func(_ context.Context, object client.Object) []reconcile.Request { + bld = bld.WatchesRawSource(source.Kind(ctx.PhysicalManager.GetCache(), &corev1.Pod{}), handler.EnqueueRequestsFromMapFunc(func(_ context.Context, object client.Object) []reconcile.Request { pod, ok := object.(*corev1.Pod) if !ok || pod == nil || !translate.Default.IsManaged(pod) || pod.Spec.NodeName == "" { return []reconcile.Request{} @@ -168,7 +219,9 @@ func modifyController(ctx *synccontext.RegisterContext, nodeServiceProvider node }, }, } - })), nil + })) + + return bld, nil } var _ syncertypes.IndicesRegisterer = &nodeSyncer{} diff --git a/pkg/controllers/resources/nodes/translate.go b/pkg/controllers/resources/nodes/translate.go index 87860cd40..ab44516d1 100644 --- a/pkg/controllers/resources/nodes/translate.go +++ b/pkg/controllers/resources/nodes/translate.go @@ -181,7 +181,7 @@ func (s *nodeSyncer) translateUpdateStatus(ctx *synccontext.SyncContext, pNode * var nonVClusterPods int64 podList := &corev1.PodList{} - err := s.podCache.List(ctx.Context, podList, client.MatchingFields{indexPodByRunningNonVClusterNode: pNode.Name}) + err := s.unmanagedPodCache.List(ctx.Context, podList, client.MatchingFields{constants.IndexRunningNonVClusterPodsByNode: pNode.Name}) if err != nil { klog.Errorf("Error listing pods: %v", err) } else {