Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: watch and enqueue node events when scheduler enabled #1352

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/constants/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ const (
IndexByHostName = "IndexByHostName"

IndexByClusterIP = "IndexByClusterIP"

// IndexRunningNonVClusterPodsByNode is only used when the vcluster scheduler is enabled.
// It maps non-vcluster pods on the node to the node name, so that the node syncer may
// calculate the allocatable resources on the node.
IndexRunningNonVClusterPodsByNode = "IndexRunningNonVClusterPodsByNode"
)

const DefaultCacheSyncTimeout = time.Minute * 15
83 changes: 68 additions & 15 deletions pkg/controllers/resources/nodes/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package nodes

import (
"context"
"fmt"

"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/event"

"github.com/loft-sh/vcluster/pkg/constants"
"github.com/loft-sh/vcluster/pkg/controllers/resources/nodes/nodeservice"
Expand All @@ -17,6 +20,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -26,10 +30,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

var (
indexPodByRunningNonVClusterNode = "indexpodbyrunningnonvclusternode"
)

func NewSyncer(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice.Provider) (syncertypes.Object, error) {
var err error
var nodeSelector labels.Selector
Expand Down Expand Up @@ -82,7 +82,7 @@ type nodeSyncer struct {
physicalClient client.Client
virtualClient client.Client

podCache client.Reader
unmanagedPodCache client.Reader
nodeServiceProvider nodeservice.Provider
enforcedTolerations []*corev1.Toleration
}
Expand All @@ -97,25 +97,29 @@ func (s *nodeSyncer) Name() string {

var _ syncertypes.ControllerModifier = &nodeSyncer{}

func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, builder *builder.Builder) (*builder.Builder, error) {
func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, bld *builder.Builder) (*builder.Builder, error) {
if s.enableScheduler {
// create a global pod cache for calculating the correct node resources
notManagedSelector, err := labels.NewRequirement(translate.MarkerLabel, selection.NotEquals, []string{translate.Suffix})
if err != nil {
return bld, fmt.Errorf("constructing label selector for non-vcluster pods: %w", err)
}
// create a pod cache containing pods from all namespaces for calculating the correct node resources
podCache, err := cache.New(ctx.PhysicalManager.GetConfig(), cache.Options{
Scheme: ctx.PhysicalManager.GetScheme(),
Mapper: ctx.PhysicalManager.GetRESTMapper(),
// omits pods managed by the vcluster
DefaultLabelSelector: labels.NewSelector().Add(*notManagedSelector),
})
if err != nil {
return nil, errors.Wrap(err, "create cache")
}
// add index for pod by node
err = podCache.IndexField(ctx.Context, &corev1.Pod{}, indexPodByRunningNonVClusterNode, func(object client.Object) []string {
err = podCache.IndexField(ctx.Context, &corev1.Pod{}, constants.IndexRunningNonVClusterPodsByNode, func(object client.Object) []string {
pPod := object.(*corev1.Pod)
// we ignore all non-running pods and the ones that are part of the current vcluster
// to later calculate the status.allocatable part of the nodes correctly
if pPod.Status.Phase == corev1.PodSucceeded || pPod.Status.Phase == corev1.PodFailed {
return []string{}
} else if translate.Default.IsManaged(pPod) {
return []string{}
} else if pPod.Spec.NodeName == "" {
return []string{}
}
Expand All @@ -131,18 +135,65 @@ func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, builder
klog.Fatalf("error starting pod cache: %v", err)
}
}()

podCache.WaitForCacheSync(ctx.Context)
s.podCache = podCache
s.unmanagedPodCache = podCache

// enqueues nodes based on pod phase changes if the scheduler is enabled
// the syncer is configured to update virtual node's .status.allocatable fields by summing the consumption of these pods
bld.WatchesRawSource(
source.Kind(podCache, &corev1.Pod{}),
handler.Funcs{
GenericFunc: func(ctx context.Context, ev event.GenericEvent, q workqueue.RateLimitingInterface) {
enqueueNonVclusterPod(nil, ev.Object, q)
},
CreateFunc: func(ctx context.Context, ev event.CreateEvent, q workqueue.RateLimitingInterface) {
enqueueNonVclusterPod(nil, ev.Object, q)
},
UpdateFunc: func(ctx context.Context, ue event.UpdateEvent, q workqueue.RateLimitingInterface) {
enqueueNonVclusterPod(ue.ObjectOld, ue.ObjectNew, q)
},
DeleteFunc: func(ctx context.Context, ev event.DeleteEvent, q workqueue.RateLimitingInterface) {
enqueueNonVclusterPod(nil, ev.Object, q)
},
},
)
}
return modifyController(ctx, s.nodeServiceProvider, builder)
return modifyController(ctx, s.nodeServiceProvider, bld)
}

func modifyController(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice.Provider, builder *builder.Builder) (*builder.Builder, error) {
// only used when scheduler is enabled
func enqueueNonVclusterPod(old, new client.Object, q workqueue.RateLimitingInterface) {
pod, ok := new.(*corev1.Pod)
if !ok {
klog.Errorf("invalid type passed to pod handler: %T", new)
return
}
// skip if node name missing
if pod.Spec.NodeName == "" {
return
}
if old != nil {
oldPod, ok := old.(*corev1.Pod)
if !ok {
klog.Errorf("invalid type passed to pod handler: %T", old)
return
}
// skip if running status not updated
if oldPod.Status.Phase == pod.Status.Phase {
return
}
}
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{Name: pod.Spec.NodeName}})
}

// this is split out because it is shared with the fake syncer
func modifyController(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice.Provider, bld *builder.Builder) (*builder.Builder, error) {
go func() {
nodeServiceProvider.Start(ctx.Context)
}()

return builder.WatchesRawSource(source.Kind(ctx.PhysicalManager.GetCache(), &corev1.Pod{}), handler.EnqueueRequestsFromMapFunc(func(_ context.Context, object client.Object) []reconcile.Request {
bld = bld.WatchesRawSource(source.Kind(ctx.PhysicalManager.GetCache(), &corev1.Pod{}), handler.EnqueueRequestsFromMapFunc(func(_ context.Context, object client.Object) []reconcile.Request {
pod, ok := object.(*corev1.Pod)
if !ok || pod == nil || !translate.Default.IsManaged(pod) || pod.Spec.NodeName == "" {
return []reconcile.Request{}
Expand All @@ -168,7 +219,9 @@ func modifyController(ctx *synccontext.RegisterContext, nodeServiceProvider node
},
},
}
})), nil
}))

return bld, nil
}

var _ syncertypes.IndicesRegisterer = &nodeSyncer{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/resources/nodes/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (s *nodeSyncer) translateUpdateStatus(ctx *synccontext.SyncContext, pNode *

var nonVClusterPods int64
podList := &corev1.PodList{}
err := s.podCache.List(ctx.Context, podList, client.MatchingFields{indexPodByRunningNonVClusterNode: pNode.Name})
err := s.unmanagedPodCache.List(ctx.Context, podList, client.MatchingFields{constants.IndexRunningNonVClusterPodsByNode: pNode.Name})
if err != nil {
klog.Errorf("Error listing pods: %v", err)
} else {
Expand Down
Loading