Skip to content

Commit

Permalink
fix: watch and enqueue node events when scheduler enabled
Browse files Browse the repository at this point in the history
Signed-off-by: Rohan CJ <[email protected]>
  • Loading branch information
rohantmp committed Nov 13, 2023
1 parent d354403 commit e23edb0
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 16 deletions.
5 changes: 5 additions & 0 deletions pkg/constants/indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ const (
IndexByHostName = "IndexByHostName"

IndexByClusterIP = "IndexByClusterIP"

// IndexRunningNonVclusterPodsByNode is only used when the vcluster scheduler is enabled.
// It maps non-vcluster pods on the node to the node name, so that the node syncer may
// calculate the allocatable resources on the node.
IndexRunningNonVclusterPodsByNode = "IndexRunningNonVclusterPodsByNode"
)

const DefaultCacheSyncTimeout = time.Minute * 15
89 changes: 74 additions & 15 deletions pkg/controllers/resources/nodes/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package nodes

import (
"context"
"fmt"

"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/loft-sh/vcluster/pkg/constants"
"github.com/loft-sh/vcluster/pkg/controllers/resources/nodes/nodeservice"
Expand All @@ -16,7 +19,9 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -26,10 +31,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

var (
indexPodByRunningNonVClusterNode = "indexpodbyrunningnonvclusternode"
)

func NewSyncer(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice.Provider) (syncertypes.Object, error) {
var err error
var nodeSelector labels.Selector
Expand Down Expand Up @@ -82,7 +83,7 @@ type nodeSyncer struct {
physicalClient client.Client
virtualClient client.Client

podCache client.Reader
unmamangedPodCache client.Reader
nodeServiceProvider nodeservice.Provider
enforcedTolerations []*corev1.Toleration
}
Expand All @@ -97,25 +98,31 @@ func (s *nodeSyncer) Name() string {

var _ syncertypes.ControllerModifier = &nodeSyncer{}

func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, builder *builder.Builder) (*builder.Builder, error) {
func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, bld *builder.Builder) (*builder.Builder, error) {
if s.enableScheduler {
// create a global pod cache for calculating the correct node resources
notManagedSelector, err := labels.NewRequirement(translate.MarkerLabel, selection.NotEquals, []string{translate.Suffix})
if err != nil {
return bld, fmt.Errorf("constructing label selector for non-vcluster pods: %w", err)
}
// create a pod cache containing pods from all namespaces for calculating the correct node resources
podCache, err := cache.New(ctx.PhysicalManager.GetConfig(), cache.Options{
Scheme: ctx.PhysicalManager.GetScheme(),
Mapper: ctx.PhysicalManager.GetRESTMapper(),
// omits pods managed by the vcluster
DefaultLabelSelector: labels.NewSelector().Add(*notManagedSelector),
// omits pods that are pending as they do not consume resources
DefaultFieldSelector: fields.OneTermNotEqualSelector("status.phase", string(corev1.PodPending)),
})
if err != nil {
return nil, errors.Wrap(err, "create cache")
}
// add index for pod by node
err = podCache.IndexField(ctx.Context, &corev1.Pod{}, indexPodByRunningNonVClusterNode, func(object client.Object) []string {
err = podCache.IndexField(ctx.Context, &corev1.Pod{}, constants.IndexRunningNonVclusterPodsByNode, func(object client.Object) []string {
pPod := object.(*corev1.Pod)
// we ignore all non-running pods and the ones that are part of the current vcluster
// to later calculate the status.allocatable part of the nodes correctly
if pPod.Status.Phase == corev1.PodSucceeded || pPod.Status.Phase == corev1.PodFailed {
return []string{}
} else if translate.Default.IsManaged(pPod) {
return []string{}
} else if pPod.Spec.NodeName == "" {
return []string{}
}
Expand All @@ -132,17 +139,65 @@ func (s *nodeSyncer) ModifyController(ctx *synccontext.RegisterContext, builder
}
}()
podCache.WaitForCacheSync(ctx.Context)
s.podCache = podCache
s.unmamangedPodCache = podCache

// enqueues nodes based on pod phase changes if the scheduler is enabled
// the scheduler configure
bld.WatchesRawSource(
source.Kind(podCache, &corev1.Pod{}),
handler.EnqueueRequestsFromMapFunc(func(_ context.Context, object client.Object) []reconcile.Request {
pPod := object.(*corev1.Pod)
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Name: pPod.Spec.NodeName,
},
},
}
}),
builder.WithPredicates(
predicate.NewPredicateFuncs(func(object client.Object) bool {
pPod, ok := object.(*corev1.Pod)
if !ok {
klog.Info("object failed to resolve into pod")
return false
}
if pPod.Status.Phase == corev1.PodPending {
klog.Info("skiping pod")
return false
}
// skip pending pods and unassigned pods
return pPod.Status.Phase != corev1.PodPending && pPod.Spec.NodeName != ""
}),
predicate.Funcs{
UpdateFunc: func(ue event.UpdateEvent) bool {
oldPod, ok := ue.ObjectOld.(*corev1.Pod)
if !ok {
klog.Info("object failed to resolve into pod")
return false
}
newPod, ok := ue.ObjectNew.(*corev1.Pod)
if !ok {
klog.Info("object failed to resolve into pod")
return false
}
// only if phase changed, and new phase is not pending (resources are immmutable for a pod)
return oldPod.Status.Phase != newPod.Status.Phase
},
},
),
)
}
return modifyController(ctx, s.nodeServiceProvider, builder)
return modifyController(ctx, s.nodeServiceProvider, bld)
}

func modifyController(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice.Provider, builder *builder.Builder) (*builder.Builder, error) {
// this is split out because it is shared with the fake syncer
func modifyController(ctx *synccontext.RegisterContext, nodeServiceProvider nodeservice.Provider, bld *builder.Builder) (*builder.Builder, error) {
go func() {
nodeServiceProvider.Start(ctx.Context)
}()

return builder.WatchesRawSource(source.Kind(ctx.PhysicalManager.GetCache(), &corev1.Pod{}), handler.EnqueueRequestsFromMapFunc(func(_ context.Context, object client.Object) []reconcile.Request {
bld = bld.WatchesRawSource(source.Kind(ctx.PhysicalManager.GetCache(), &corev1.Pod{}), handler.EnqueueRequestsFromMapFunc(func(_ context.Context, object client.Object) []reconcile.Request {
pod, ok := object.(*corev1.Pod)
if !ok || pod == nil || !translate.Default.IsManaged(pod) || pod.Spec.NodeName == "" {
return []reconcile.Request{}
Expand All @@ -168,7 +223,11 @@ func modifyController(ctx *synccontext.RegisterContext, nodeServiceProvider node
},
},
}
})), nil
}))

// if s.EnableScheduler

return bld, nil
}

var _ syncertypes.IndicesRegisterer = &nodeSyncer{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/resources/nodes/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (s *nodeSyncer) translateUpdateStatus(ctx *synccontext.SyncContext, pNode *

var nonVClusterPods int64
podList := &corev1.PodList{}
err := s.podCache.List(ctx.Context, podList, client.MatchingFields{indexPodByRunningNonVClusterNode: pNode.Name})
err := s.unmamangedPodCache.List(ctx.Context, podList, client.MatchingFields{constants.IndexRunningNonVclusterPodsByNode: pNode.Name})
if err != nil {
klog.Errorf("Error listing pods: %v", err)
} else {
Expand Down

0 comments on commit e23edb0

Please sign in to comment.