diff --git a/examples/pod-group-jobs/job1.yaml b/examples/pod-group-jobs/job1.yaml new file mode 100644 index 0000000..e0ebba0 --- /dev/null +++ b/examples/pod-group-jobs/job1.yaml @@ -0,0 +1,59 @@ +apiVersion: v1 +kind: Service +metadata: + name: s0 +spec: + clusterIP: None + selector: + job-name: job-0 +--- +apiVersion: batch/v1 +kind: Job +metadata: + # name will be derived based on iteration + name: job-0 +spec: + completions: 4 + parallelism: 4 + completionMode: Indexed + template: + metadata: + labels: + app: job-0 + spec: + subdomain: s0 + schedulerName: fluence + restartPolicy: Never + containers: + - name: example-workload + image: bash:latest + resources: + limits: + cpu: "3" + requests: + cpu: "3" + command: + - bash + - -c + - | + if [ $JOB_COMPLETION_INDEX -ne "0" ] + then + sleep infinity + fi + echo "START: $(date +%s)" + for i in 0 1 2 3 + do + gotStatus="-1" + wantStatus="0" + while [ $gotStatus -ne $wantStatus ] + do + ping -c 1 job-0-${i}.s0 > /dev/null 2>&1 + gotStatus=$? + if [ $gotStatus -ne $wantStatus ]; then + echo "Failed to ping pod job-0-${i}.s0, retrying in 1 second..." + sleep 1 + fi + done + echo "Successfully pinged pod: job-0-${i}.s0" + done + echo "DONE: $(date +%s)" \ No newline at end of file diff --git a/examples/pod-group-jobs/job2.yaml b/examples/pod-group-jobs/job2.yaml new file mode 100644 index 0000000..c39820b --- /dev/null +++ b/examples/pod-group-jobs/job2.yaml @@ -0,0 +1,59 @@ +apiVersion: v1 +kind: Service +metadata: + name: s1 +spec: + clusterIP: None + selector: + job-name: job-1 +--- +apiVersion: batch/v1 +kind: Job +metadata: + # name will be derived based on iteration + name: job-1 +spec: + completions: 4 + parallelism: 4 + completionMode: Indexed + template: + metadata: + labels: + app: job-1 + spec: + subdomain: s1 + schedulerName: fluence + restartPolicy: Never + containers: + - name: example-workload + image: bash:latest + resources: + limits: + cpu: "3" + requests: + cpu: "3" + command: + - bash + - -c + - | + if [ $JOB_COMPLETION_INDEX -ne "0" ] + then + sleep infinity + fi + echo "START: $(date +%s)" + for i in 0 1 2 3 + do + gotStatus="-1" + wantStatus="0" + while [ $gotStatus -ne $wantStatus ] + do + ping -c 1 job-0-${i}.s1 > /dev/null 2>&1 + gotStatus=$? + if [ $gotStatus -ne $wantStatus ]; then + echo "Failed to ping pod job-0-${i}.s1, retrying in 1 second..." + sleep 1 + fi + done + echo "Successfully pinged pod: job-0-${i}.s1" + done + echo "DONE: $(date +%s)" \ No newline at end of file diff --git a/sig-scheduler-plugins/pkg/fluence/core/core.go b/sig-scheduler-plugins/pkg/fluence/core/core.go index 1e75814..ea300ce 100644 --- a/sig-scheduler-plugins/pkg/fluence/core/core.go +++ b/sig-scheduler-plugins/pkg/fluence/core/core.go @@ -39,11 +39,33 @@ import ( "sigs.k8s.io/scheduler-plugins/pkg/util" ) +type Status string + +const ( + // PodGroupNotSpecified denotes no PodGroup is specified in the Pod spec. + PodGroupNotSpecified Status = "PodGroup not specified" + // PodGroupNotFound denotes the specified PodGroup in the Pod spec is + // not found in API server. + PodGroupNotFound Status = "PodGroup not found" + Success Status = "Success" + Wait Status = "Wait" + + permitStateKey = "PermitFluence" +) + // TODO should eventually store group name here to reassociate on reload type FluxStateData struct { NodeName string } +type PermitState struct { + Activate bool +} + +func (s *PermitState) Clone() framework.StateData { + return &PermitState{Activate: s.Activate} +} + func (s *FluxStateData) Clone() framework.StateData { clone := &FluxStateData{ NodeName: s.NodeName, @@ -58,6 +80,10 @@ type Manager interface { GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup) GetCreationTimestamp(*corev1.Pod, time.Time) time.Time DeletePermittedPodGroup(string) + Permit(context.Context, *framework.CycleState, *corev1.Pod) Status + CalculateAssignedPods(string, string) int + ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) + BackoffPodGroup(string, time.Duration) } // PodGroupManager defines the scheduling operation called @@ -110,26 +136,69 @@ func NewPodGroupManager( return pgMgr } +func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Duration) { + if backoff == time.Duration(0) { + return + } + pgMgr.backedOffPG.Add(pgName, nil, backoff) +} + +// ActivateSiblings stashes the pods belonging to the same PodGroup of the given pod +// in the given state, with a reserved key "kubernetes.io/pods-to-activate". +func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) { + pgName := util.GetPodGroupLabel(pod) + if pgName == "" { + return + } + + // Only proceed if it's explicitly requested to activate sibling pods. + if c, err := state.Read(permitStateKey); err != nil { + return + } else if s, ok := c.(*PermitState); !ok || !s.Activate { + return + } + + pods, err := pgMgr.podLister.Pods(pod.Namespace).List( + labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: pgName}), + ) + if err != nil { + klog.ErrorS(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgName) + return + } + + for i := range pods { + if pods[i].UID == pod.UID { + pods = append(pods[:i], pods[i+1:]...) + break + } + } + + if len(pods) != 0 { + if c, err := state.Read(framework.PodsToActivateKey); err == nil { + if s, ok := c.(*framework.PodsToActivate); ok { + s.Lock() + for _, pod := range pods { + namespacedName := GetNamespacedName(pod) + s.Map[namespacedName] = pod + } + s.Unlock() + } + } + } +} + // GetStatuses string (of all pods) to show for debugging purposes -// Since we loop here, we also determine if the first pod is the one -// we are considering -func (pgMgr *PodGroupManager) GetStatusesAndIndex( +func (pgMgr *PodGroupManager) GetStatuses( pods []*corev1.Pod, pod *corev1.Pod, -) (string, bool, int) { +) string { statuses := "" // We need to distinguish 0 from the default and not finding anything - foundIndex := false - index := 0 - for i, p := range pods { - if p.Name == pod.Name { - foundIndex = true - index = i - } + for _, p := range pods { statuses += " " + fmt.Sprintf("%s", p.Status.Phase) } - return statuses, foundIndex, index + return statuses } // GetPodNode is a quick lookup to see if we have a node @@ -138,6 +207,39 @@ func (pgMgr *PodGroupManager) GetPodNode(pod *corev1.Pod) string { return node } +// Permit permits a pod to run, if the minMember match, it would send a signal to chan. +func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.CycleState, pod *corev1.Pod) Status { + pgFullName, pg := pgMgr.GetPodGroup(ctx, pod) + if pgFullName == "" { + return PodGroupNotSpecified + } + if pg == nil { + // A Pod with a podGroup name but without a PodGroup found is denied. + return PodGroupNotFound + } + + assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace) + // The number of pods that have been assigned nodes is calculated from the snapshot. + // The current pod in not included in the snapshot during the current scheduling cycle. + if int32(assigned)+1 >= pg.Spec.MinMember { + return Success + } + + if assigned == 0 { + // Given we've reached Permit(), it's mean all PreFilter checks (minMember & minResource) + // already pass through, so if assigned == 0, it could be due to: + // - minResource get satisfied + // - new pods added + // In either case, we should and only should use this 0-th pod to trigger activating + // its siblings. + // It'd be in-efficient if we trigger activating siblings unconditionally. + // See https://github.com/kubernetes-sigs/scheduler-plugins/issues/682 + state.Write(permitStateKey, &PermitState{Activate: true}) + } + + return Wait +} + // PreFilter filters out a pod if // 1. it belongs to a podgroup that was recently denied or // 2. the total number of pods in the podgroup is less than the minimum number of pods @@ -169,7 +271,7 @@ func (pgMgr *PodGroupManager) PreFilter( // Only allow scheduling the first in the group so the others come after // Get statuses to show for debugging - statuses, found, idx := pgMgr.GetStatusesAndIndex(pods, pod) + statuses := pgMgr.GetStatuses(pods, pod) // This shows us the number of pods we have in the set and their states pgMgr.log.Info("[PodGroup PreFilter] group: %s pods: %s MinMember: %d Size: %d", pgFullName, statuses, pg.Spec.MinMember, len(pods)) @@ -178,18 +280,6 @@ func (pgMgr *PodGroupManager) PreFilter( "current pods number: %v, minMember of group: %v", pod.Name, len(pods), pg.Spec.MinMember) } - if !found { - return fmt.Errorf("pod %s was not found in group - this should not happen", pod.Name) - } - - // We only will AskFlux for the first pod - // This makes an assumption that the order listed is the order in the queue, I'm not - // sure that is true in practice. This is the one case with retry. This design - // probably needs thinking and work. - if idx != 0 { - return fmt.Errorf("pod %s is not first in the list, will wait to schedule", pod.Name) - } - // TODO we likely can take advantage of these resources or other custom // attributes we add. For now ignore and calculate based on pod needs (above) // if pg.Spec.MinResources == nil { @@ -233,7 +323,9 @@ func (pgMgr *PodGroupManager) PreFilter( stateData := FluxStateData{NodeName: node} state.Write(framework.StateKey(pod.Name), &stateData) // Also save to the podToNode lookup + pgMgr.mutex.Lock() pgMgr.podToNode[pod.Name] = node + pgMgr.mutex.Unlock() } pgMgr.permittedPG.Add(pgFullName, pgFullName, *pgMgr.scheduleTimeout) return nil @@ -252,6 +344,25 @@ func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time return pg.CreationTimestamp.Time } +// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound. +func (pgMgr *PodGroupManager) CalculateAssignedPods(podGroupName, namespace string) int { + nodeInfos, err := pgMgr.snapshotSharedLister.NodeInfos().List() + if err != nil { + pgMgr.log.Error("Cannot get nodeInfos from frameworkHandle: %s", err) + return 0 + } + var count int + for _, nodeInfo := range nodeInfos { + for _, podInfo := range nodeInfo.Pods { + pod := podInfo.Pod + if util.GetPodGroupLabel(pod) == podGroupName && pod.Namespace == namespace && pod.Spec.NodeName != "" { + count++ + } + } + } + return count +} + // DeletePermittedPodGroup deletes a podGroup that passes Pre-Filter but reaches PostFilter. func (pgMgr *PodGroupManager) DeletePermittedPodGroup(pgFullName string) { pgMgr.permittedPG.Delete(pgFullName) diff --git a/sig-scheduler-plugins/pkg/fluence/fluence.go b/sig-scheduler-plugins/pkg/fluence/fluence.go index 84f3e95..099d2f3 100644 --- a/sig-scheduler-plugins/pkg/fluence/fluence.go +++ b/sig-scheduler-plugins/pkg/fluence/fluence.go @@ -22,8 +22,8 @@ import ( "sync" "time" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" - klog "k8s.io/klog/v2" "sigs.k8s.io/scheduler-plugins/pkg/logger" @@ -33,12 +33,12 @@ import ( "k8s.io/client-go/tools/cache" fgroup "sigs.k8s.io/scheduler-plugins/pkg/fluence/group" + flabel "sigs.k8s.io/scheduler-plugins/pkg/fluence/labels" corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/kubernetes/pkg/scheduler/framework" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/scheduler-plugins/apis/config" "sigs.k8s.io/scheduler-plugins/apis/scheduling" "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" fcore "sigs.k8s.io/scheduler-plugins/pkg/fluence/core" @@ -52,6 +52,7 @@ type Fluence struct { frameworkHandler framework.Handle pgMgr fcore.Manager scheduleTimeout *time.Duration + pgBackoff *time.Duration log *logger.DebugLogger } @@ -59,6 +60,15 @@ var ( _ framework.QueueSortPlugin = &Fluence{} _ framework.PreFilterPlugin = &Fluence{} _ framework.FilterPlugin = &Fluence{} + + _ framework.PostFilterPlugin = &Fluence{} + _ framework.PermitPlugin = &Fluence{} + _ framework.ReservePlugin = &Fluence{} + + _ framework.EnqueueExtensions = &Fluence{} + + permitWaitingTimeSeconds int64 = 60 + podGroupBackoffSeconds int64 = 0 ) const ( @@ -69,14 +79,12 @@ const ( // Initialize and return a new Fluence Custom Scheduler Plugin func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) { - // Keep these empty for now, use defaults - args := config.CoschedulingArgs{} ctx := context.TODO() // Make fluence his own little logger! // This can eventually be a flag, but just going to set for now // It shall be a very chonky file. Oh lawd he comin! - l := logger.NewDebugLogger(logger.LevelError, "/tmp/fluence.log") + l := logger.NewDebugLogger(logger.LevelDebug, "/tmp/fluence.log") scheme := runtime.NewScheme() _ = clientscheme.AddToScheme(scheme) @@ -93,7 +101,7 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) fluxPodsInformer.AddIndexers(cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) // PermitWaitingTimeSeconds is the waiting timeout in seconds. - scheduleTimeDuration := time.Duration(args.PermitWaitingTimeSeconds) * time.Second + scheduleTimeDuration := time.Duration(permitWaitingTimeSeconds) * time.Second pgMgr := fcore.NewPodGroupManager( client, handle.SnapshotSharedLister(), @@ -110,11 +118,13 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) }) go fluxPodsInformer.Run(ctx.Done()) + backoffSeconds := time.Duration(podGroupBackoffSeconds) * time.Second plugin := &Fluence{ frameworkHandler: handle, pgMgr: pgMgr, scheduleTimeout: &scheduleTimeDuration, log: l, + pgBackoff: &backoffSeconds, } // TODO this is not supported yet @@ -219,16 +229,131 @@ func (f *Fluence) PreFilter( node := f.pgMgr.GetPodNode(pod) f.mutex.Unlock() if node != "" { + f.log.Info("[Fluence PreFilter] assigned pod %s to node %s\n", pod.Name, node) result := framework.PreFilterResult{NodeNames: sets.New(node)} return &result, framework.NewStatus(framework.Success, "") } + f.log.Info("[Fluence PreFilter] pod %s does not have a node assigned\n", pod.Name) + // This will populate the node name into the pod group manager err := f.pgMgr.PreFilter(ctx, pod, state) if err != nil { - f.log.Error("[Fluence PreFilter] failed pod %s: %s", klog.KObj(pod), err.Error()) + f.log.Error("[Fluence PreFilter] failed pod %s: %s", pod.Name, err.Error()) return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) } node = f.pgMgr.GetPodNode(pod) result := framework.PreFilterResult{NodeNames: sets.New(node)} return &result, framework.NewStatus(framework.Success, "") } + +// PostFilter is used to reject a group of pods if a pod does not pass PreFilter or Filter. +func (f *Fluence) PostFilter( + ctx context.Context, + state *framework.CycleState, + pod *corev1.Pod, + filteredNodeStatusMap framework.NodeToStatusMap, +) (*framework.PostFilterResult, *framework.Status) { + + pgName, pg := f.pgMgr.GetPodGroup(ctx, pod) + if pg == nil { + f.log.Info("Pod does not belong to any group, pod %s", pod.Name) + return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, "can not find pod group") + } + + // This explicitly checks nodes, and we can skip scheduling another pod if we already + // have the minimum. For fluence since we expect an exact size this likely is not needed + assigned := f.pgMgr.CalculateAssignedPods(pg.Name, pod.Namespace) + if assigned >= int(pg.Spec.MinMember) { + f.log.Info("Assigned pods podGroup %s is assigned %s", pgName, assigned) + return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) + } + + // Took out percentage chcek here, doesn't make sense to me. + + // It's based on an implicit assumption: if the nth Pod failed, + // it's inferrable other Pods belonging to the same PodGroup would be very likely to fail. + f.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + if waitingPod.GetPod().Namespace == pod.Namespace && flabel.GetPodGroupLabel(waitingPod.GetPod()) == pg.Name { + f.log.Info("PostFilter rejects the pod for podGroup %s and pod %s", pgName, waitingPod.GetPod().Name) + waitingPod.Reject(f.Name(), "optimistic rejection in PostFilter") + } + }) + + if f.pgBackoff != nil { + pods, err := f.frameworkHandler.SharedInformerFactory().Core().V1().Pods().Lister().Pods(pod.Namespace).List( + labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: flabel.GetPodGroupLabel(pod)}), + ) + if err == nil && len(pods) >= int(pg.Spec.MinMember) { + f.pgMgr.BackoffPodGroup(pgName, *f.pgBackoff) + } + } + + f.pgMgr.DeletePermittedPodGroup(pgName) + return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, + fmt.Sprintf("PodGroup %v gets rejected due to Pod %v is unschedulable even after PostFilter", pgName, pod.Name)) +} + +// Permit is the functions invoked by the framework at "Permit" extension point. +func (f *Fluence) Permit( + ctx context.Context, + state *framework.CycleState, + pod *corev1.Pod, + nodeName string, +) (*framework.Status, time.Duration) { + + f.log.Info("Checking permit for pod %s to node %s", pod.Name, nodeName) + waitTime := *f.scheduleTimeout + s := f.pgMgr.Permit(ctx, state, pod) + var retStatus *framework.Status + switch s { + case fcore.PodGroupNotSpecified: + f.log.Info("Checking permit for pod %s to node %s: PodGroupNotSpecified", pod.Name, nodeName) + return framework.NewStatus(framework.Success, ""), 0 + case fcore.PodGroupNotFound: + f.log.Info("Checking permit for pod %s to node %s: PodGroupNotFound", pod.Name, nodeName) + return framework.NewStatus(framework.Unschedulable, "PodGroup not found"), 0 + case fcore.Wait: + f.log.Info("Pod %s is waiting to be scheduled to node %s", pod.Name, nodeName) + _, pg := f.pgMgr.GetPodGroup(ctx, pod) + if wait := fgroup.GetWaitTimeDuration(pg, f.scheduleTimeout); wait != 0 { + waitTime = wait + } + retStatus = framework.NewStatus(framework.Wait) + + // We will also request to move the sibling pods back to activeQ. + f.pgMgr.ActivateSiblings(pod, state) + case fcore.Success: + pgFullName := flabel.GetPodGroupFullName(pod) + f.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + if flabel.GetPodGroupFullName(waitingPod.GetPod()) == pgFullName { + f.log.Info("Permit allows pod %s", waitingPod.GetPod().Name) + waitingPod.Allow(f.Name()) + } + }) + f.log.Info("Permit allows pod %s", pod.Name) + retStatus = framework.NewStatus(framework.Success) + waitTime = 0 + } + + return retStatus, waitTime +} + +// Reserve is the functions invoked by the framework at "reserve" extension point. +func (f *Fluence) Reserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { + return nil +} + +// Unreserve rejects all other Pods in the PodGroup when one of the pods in the group times out. +func (f *Fluence) Unreserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) { + pgName, pg := f.pgMgr.GetPodGroup(ctx, pod) + if pg == nil { + return + } + f.frameworkHandler.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + if waitingPod.GetPod().Namespace == pod.Namespace && flabel.GetPodGroupLabel(waitingPod.GetPod()) == pg.Name { + f.log.Info("Unreserve rejects pod %s in group %s", waitingPod.GetPod().Name, pgName) + waitingPod.Reject(f.Name(), "rejection in Unreserve") + } + }) + f.pgMgr.DeletePermittedPodGroup(pgName) +} diff --git a/sig-scheduler-plugins/pkg/fluence/group/group.go b/sig-scheduler-plugins/pkg/fluence/group/group.go index 0ee0831..dd039e3 100644 --- a/sig-scheduler-plugins/pkg/fluence/group/group.go +++ b/sig-scheduler-plugins/pkg/fluence/group/group.go @@ -2,6 +2,7 @@ package group import ( "fmt" + "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -11,6 +12,9 @@ import ( sched "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" ) +// DefaultWaitTime is 60s if ScheduleTimeoutSeconds is not specified. +const DefaultWaitTime = 60 * time.Second + // CreateFakeGroup wraps an arbitrary pod in a fake group for fluence to schedule // This happens only in PreFilter so we already sorted func CreateFakeGroup(pod *corev1.Pod) *sched.PodGroup { @@ -44,3 +48,17 @@ func GetCreationTimestamp(groupName string, pg *sched.PodGroup, podInfo *framewo klog.Errorf(" [Fluence] Pod group %s time IsZero, we should not have reached here", groupName) return metav1.NewMicroTime(*podInfo.InitialAttemptTimestamp) } + +// GetWaitTimeDuration returns a wait timeout based on the following precedences: +// 1. spec.scheduleTimeoutSeconds of the given pg, if specified +// 2. given scheduleTimeout, if not nil +// 3. fall back to DefaultWaitTime +func GetWaitTimeDuration(pg *sched.PodGroup, scheduleTimeout *time.Duration) time.Duration { + if pg != nil && pg.Spec.ScheduleTimeoutSeconds != nil { + return time.Duration(*pg.Spec.ScheduleTimeoutSeconds) * time.Second + } + if scheduleTimeout != nil && *scheduleTimeout != 0 { + return *scheduleTimeout + } + return DefaultWaitTime +} diff --git a/sig-scheduler-plugins/pkg/logger/logger.go b/sig-scheduler-plugins/pkg/logger/logger.go index 053021a..d1e238e 100644 --- a/sig-scheduler-plugins/pkg/logger/logger.go +++ b/sig-scheduler-plugins/pkg/logger/logger.go @@ -79,8 +79,8 @@ func (l *DebugLogger) log(level int, prefix string, message ...any) error { rest := message[1:] // msg := fmt.Sprintf(message...) - fmt.Printf("Compariing level %d >= %d\n", level, l.level) - if level >= l.level { + fmt.Printf("Compariing level %d <= %d\n", level, l.level) + if level <= l.level { logger.Printf(prolog, rest...) } return l.Stop()