Skip to content

Commit

Permalink
Merge pull request #74 from flux-framework/test-with-permit
Browse files Browse the repository at this point in the history
test: adding permit to allow for sibling pod scheduling
  • Loading branch information
vsoch authored Apr 20, 2024
2 parents 8c99f10 + ef0ed50 commit 50ad162
Show file tree
Hide file tree
Showing 10 changed files with 420 additions and 42 deletions.
59 changes: 59 additions & 0 deletions examples/pod-group-jobs/job1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
apiVersion: v1
kind: Service
metadata:
name: s0
spec:
clusterIP: None
selector:
job-name: job-0
---
apiVersion: batch/v1
kind: Job
metadata:
# name will be derived based on iteration
name: job-0
spec:
completions: 4
parallelism: 4
completionMode: Indexed
template:
metadata:
labels:
app: job-0
spec:
subdomain: s0
schedulerName: fluence
restartPolicy: Never
containers:
- name: example-workload
image: bash:latest
resources:
limits:
cpu: "3"
requests:
cpu: "3"
command:
- bash
- -c
- |
if [ $JOB_COMPLETION_INDEX -ne "0" ]
then
sleep infinity
fi
echo "START: $(date +%s)"
for i in 0 1 2 3
do
gotStatus="-1"
wantStatus="0"
while [ $gotStatus -ne $wantStatus ]
do
ping -c 1 job-0-${i}.s0 > /dev/null 2>&1
gotStatus=$?
if [ $gotStatus -ne $wantStatus ]; then
echo "Failed to ping pod job-0-${i}.s0, retrying in 1 second..."
sleep 1
fi
done
echo "Successfully pinged pod: job-0-${i}.s0"
done
echo "DONE: $(date +%s)"
59 changes: 59 additions & 0 deletions examples/pod-group-jobs/job2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
apiVersion: v1
kind: Service
metadata:
name: s1
spec:
clusterIP: None
selector:
job-name: job-1
---
apiVersion: batch/v1
kind: Job
metadata:
# name will be derived based on iteration
name: job-1
spec:
completions: 4
parallelism: 4
completionMode: Indexed
template:
metadata:
labels:
app: job-1
spec:
subdomain: s1
schedulerName: fluence
restartPolicy: Never
containers:
- name: example-workload
image: bash:latest
resources:
limits:
cpu: "3"
requests:
cpu: "3"
command:
- bash
- -c
- |
if [ $JOB_COMPLETION_INDEX -ne "0" ]
then
sleep infinity
fi
echo "START: $(date +%s)"
for i in 0 1 2 3
do
gotStatus="-1"
wantStatus="0"
while [ $gotStatus -ne $wantStatus ]
do
ping -c 1 job-0-${i}.s1 > /dev/null 2>&1
gotStatus=$?
if [ $gotStatus -ne $wantStatus ]; then
echo "Failed to ping pod job-0-${i}.s1, retrying in 1 second..."
sleep 1
fi
done
echo "Successfully pinged pod: job-0-${i}.s1"
done
echo "DONE: $(date +%s)"
8 changes: 5 additions & 3 deletions sig-scheduler-plugins/cmd/controller/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"sigs.k8s.io/controller-runtime/pkg/webhook"

metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
api "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"
"sigs.k8s.io/scheduler-plugins/pkg/controllers"
)
Expand All @@ -50,9 +51,10 @@ func Run(s *ServerRunOptions) error {
// Controller Runtime Controllers
ctrl.SetLogger(klogr.New())
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: s.MetricsAddr,
Port: 9443,
Scheme: scheme,
Metrics: metricsserver.Options{
BindAddress: s.MetricsAddr,
},
HealthProbeBindAddress: s.ProbeAddr,
LeaderElection: s.EnableLeaderElection,
LeaderElectionID: "sched-plugins-controllers",
Expand Down
162 changes: 137 additions & 25 deletions sig-scheduler-plugins/pkg/fluence/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,33 @@ import (
"sigs.k8s.io/scheduler-plugins/pkg/util"
)

type Status string

const (
// PodGroupNotSpecified denotes no PodGroup is specified in the Pod spec.
PodGroupNotSpecified Status = "PodGroup not specified"
// PodGroupNotFound denotes the specified PodGroup in the Pod spec is
// not found in API server.
PodGroupNotFound Status = "PodGroup not found"
Success Status = "Success"
Wait Status = "Wait"

permitStateKey = "PermitFluence"
)

// TODO should eventually store group name here to reassociate on reload
type FluxStateData struct {
NodeName string
}

type PermitState struct {
Activate bool
}

func (s *PermitState) Clone() framework.StateData {
return &PermitState{Activate: s.Activate}
}

func (s *FluxStateData) Clone() framework.StateData {
clone := &FluxStateData{
NodeName: s.NodeName,
Expand All @@ -58,6 +80,10 @@ type Manager interface {
GetPodGroup(context.Context, *corev1.Pod) (string, *v1alpha1.PodGroup)
GetCreationTimestamp(*corev1.Pod, time.Time) time.Time
DeletePermittedPodGroup(string)
Permit(context.Context, *framework.CycleState, *corev1.Pod) Status
CalculateAssignedPods(string, string) int
ActivateSiblings(pod *corev1.Pod, state *framework.CycleState)
BackoffPodGroup(string, time.Duration)
}

// PodGroupManager defines the scheduling operation called
Expand Down Expand Up @@ -110,26 +136,69 @@ func NewPodGroupManager(
return pgMgr
}

func (pgMgr *PodGroupManager) BackoffPodGroup(pgName string, backoff time.Duration) {
if backoff == time.Duration(0) {
return
}
pgMgr.backedOffPG.Add(pgName, nil, backoff)
}

// ActivateSiblings stashes the pods belonging to the same PodGroup of the given pod
// in the given state, with a reserved key "kubernetes.io/pods-to-activate".
func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework.CycleState) {
pgName := util.GetPodGroupLabel(pod)
if pgName == "" {
return
}

// Only proceed if it's explicitly requested to activate sibling pods.
if c, err := state.Read(permitStateKey); err != nil {
return
} else if s, ok := c.(*PermitState); !ok || !s.Activate {
return
}

pods, err := pgMgr.podLister.Pods(pod.Namespace).List(
labels.SelectorFromSet(labels.Set{v1alpha1.PodGroupLabel: pgName}),
)
if err != nil {
klog.ErrorS(err, "Failed to obtain pods belong to a PodGroup", "podGroup", pgName)
return
}

for i := range pods {
if pods[i].UID == pod.UID {
pods = append(pods[:i], pods[i+1:]...)
break
}
}

if len(pods) != 0 {
if c, err := state.Read(framework.PodsToActivateKey); err == nil {
if s, ok := c.(*framework.PodsToActivate); ok {
s.Lock()
for _, pod := range pods {
namespacedName := GetNamespacedName(pod)
s.Map[namespacedName] = pod
}
s.Unlock()
}
}
}
}

// GetStatuses string (of all pods) to show for debugging purposes
// Since we loop here, we also determine if the first pod is the one
// we are considering
func (pgMgr *PodGroupManager) GetStatusesAndIndex(
func (pgMgr *PodGroupManager) GetStatuses(
pods []*corev1.Pod,
pod *corev1.Pod,
) (string, bool, int) {
) string {
statuses := ""

// We need to distinguish 0 from the default and not finding anything
foundIndex := false
index := 0
for i, p := range pods {
if p.Name == pod.Name {
foundIndex = true
index = i
}
for _, p := range pods {
statuses += " " + fmt.Sprintf("%s", p.Status.Phase)
}
return statuses, foundIndex, index
return statuses
}

// GetPodNode is a quick lookup to see if we have a node
Expand All @@ -138,6 +207,39 @@ func (pgMgr *PodGroupManager) GetPodNode(pod *corev1.Pod) string {
return node
}

// Permit permits a pod to run, if the minMember match, it would send a signal to chan.
func (pgMgr *PodGroupManager) Permit(ctx context.Context, state *framework.CycleState, pod *corev1.Pod) Status {
pgFullName, pg := pgMgr.GetPodGroup(ctx, pod)
if pgFullName == "" {
return PodGroupNotSpecified
}
if pg == nil {
// A Pod with a podGroup name but without a PodGroup found is denied.
return PodGroupNotFound
}

assigned := pgMgr.CalculateAssignedPods(pg.Name, pg.Namespace)
// The number of pods that have been assigned nodes is calculated from the snapshot.
// The current pod in not included in the snapshot during the current scheduling cycle.
if int32(assigned)+1 >= pg.Spec.MinMember {
return Success
}

if assigned == 0 {
// Given we've reached Permit(), it's mean all PreFilter checks (minMember & minResource)
// already pass through, so if assigned == 0, it could be due to:
// - minResource get satisfied
// - new pods added
// In either case, we should and only should use this 0-th pod to trigger activating
// its siblings.
// It'd be in-efficient if we trigger activating siblings unconditionally.
// See https://github.com/kubernetes-sigs/scheduler-plugins/issues/682
state.Write(permitStateKey, &PermitState{Activate: true})
}

return Wait
}

// PreFilter filters out a pod if
// 1. it belongs to a podgroup that was recently denied or
// 2. the total number of pods in the podgroup is less than the minimum number of pods
Expand Down Expand Up @@ -169,7 +271,7 @@ func (pgMgr *PodGroupManager) PreFilter(
// Only allow scheduling the first in the group so the others come after

// Get statuses to show for debugging
statuses, found, idx := pgMgr.GetStatusesAndIndex(pods, pod)
statuses := pgMgr.GetStatuses(pods, pod)

// This shows us the number of pods we have in the set and their states
pgMgr.log.Info("[PodGroup PreFilter] group: %s pods: %s MinMember: %d Size: %d", pgFullName, statuses, pg.Spec.MinMember, len(pods))
Expand All @@ -178,18 +280,6 @@ func (pgMgr *PodGroupManager) PreFilter(
"current pods number: %v, minMember of group: %v", pod.Name, len(pods), pg.Spec.MinMember)
}

if !found {
return fmt.Errorf("pod %s was not found in group - this should not happen", pod.Name)
}

// We only will AskFlux for the first pod
// This makes an assumption that the order listed is the order in the queue, I'm not
// sure that is true in practice. This is the one case with retry. This design
// probably needs thinking and work.
if idx != 0 {
return fmt.Errorf("pod %s is not first in the list, will wait to schedule", pod.Name)
}

// TODO we likely can take advantage of these resources or other custom
// attributes we add. For now ignore and calculate based on pod needs (above)
// if pg.Spec.MinResources == nil {
Expand All @@ -203,6 +293,7 @@ func (pgMgr *PodGroupManager) PreFilter(
// it may not necessarily pass Filter due to other constraints such as affinity/taints.
_, ok := pgMgr.permittedPG.Get(pgFullName)
if ok {
pgMgr.log.Info("[PodGroup PreFilter] Pod Group %s is already admitted", pgFullName)
return nil
}

Expand Down Expand Up @@ -233,7 +324,9 @@ func (pgMgr *PodGroupManager) PreFilter(
stateData := FluxStateData{NodeName: node}
state.Write(framework.StateKey(pod.Name), &stateData)
// Also save to the podToNode lookup
pgMgr.mutex.Lock()
pgMgr.podToNode[pod.Name] = node
pgMgr.mutex.Unlock()
}
pgMgr.permittedPG.Add(pgFullName, pgFullName, *pgMgr.scheduleTimeout)
return nil
Expand All @@ -252,6 +345,25 @@ func (pgMgr *PodGroupManager) GetCreationTimestamp(pod *corev1.Pod, ts time.Time
return pg.CreationTimestamp.Time
}

// CalculateAssignedPods returns the number of pods that has been assigned nodes: assumed or bound.
func (pgMgr *PodGroupManager) CalculateAssignedPods(podGroupName, namespace string) int {
nodeInfos, err := pgMgr.snapshotSharedLister.NodeInfos().List()
if err != nil {
pgMgr.log.Error("Cannot get nodeInfos from frameworkHandle: %s", err)
return 0
}
var count int
for _, nodeInfo := range nodeInfos {
for _, podInfo := range nodeInfo.Pods {
pod := podInfo.Pod
if util.GetPodGroupLabel(pod) == podGroupName && pod.Namespace == namespace && pod.Spec.NodeName != "" {
count++
}
}
}
return count
}

// DeletePermittedPodGroup deletes a podGroup that passes Pre-Filter but reaches PostFilter.
func (pgMgr *PodGroupManager) DeletePermittedPodGroup(pgFullName string) {
pgMgr.permittedPG.Delete(pgFullName)
Expand Down
Loading

0 comments on commit 50ad162

Please sign in to comment.