Skip to content

Commit

Permalink
Merge pull request #3751 from JesseStutler/czc_dev
Browse files Browse the repository at this point in the history
feature: Add podgroups statistics
  • Loading branch information
volcano-sh-bot authored Nov 21, 2024
2 parents 1476ee2 + f14999c commit da761e2
Show file tree
Hide file tree
Showing 14 changed files with 321 additions and 331 deletions.
26 changes: 26 additions & 0 deletions pkg/cli/podgroup/podgroup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package podgroup

import "volcano.sh/apis/pkg/apis/scheduling/v1beta1"

type PodGroupStatistics struct {
Inqueue int
Pending int
Running int
Unknown int
Completed int
}

func (pgStats *PodGroupStatistics) StatPodGroupCountsForQueue(pg *v1beta1.PodGroup) {
switch pg.Status.Phase {
case v1beta1.PodGroupInqueue:
pgStats.Inqueue++
case v1beta1.PodGroupPending:
pgStats.Pending++
case v1beta1.PodGroupRunning:
pgStats.Running++
case v1beta1.PodGroupUnknown:
pgStats.Unknown++
case v1beta1.PodGroupCompleted:
pgStats.Completed++
}
}
31 changes: 24 additions & 7 deletions pkg/cli/queue/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/cli/podgroup"
)

type getFlags struct {
Expand Down Expand Up @@ -63,21 +64,37 @@ func GetQueue(ctx context.Context) error {
return err
}

PrintQueue(queue, os.Stdout)
// Although the featuregate called CustomResourceFieldSelectors is enabled by default after v1.31, there are still
// users using k8s versions lower than v1.31. Therefore we can only get all the podgroups from kube-apiserver
// and then filtering them.
pgList, err := queueClient.SchedulingV1beta1().PodGroups("").List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list podgroup for queue %s with err: %v", getQueueFlags.Name, err)
}

pgStats := &podgroup.PodGroupStatistics{}
for _, pg := range pgList.Items {
if pg.Spec.Queue == getQueueFlags.Name {
pgStats.StatPodGroupCountsForQueue(&pg)
}
}

PrintQueue(queue, pgStats, os.Stdout)

return nil
}

// PrintQueue prints queue information.
func PrintQueue(queue *v1beta1.Queue, writer io.Writer) {
_, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s\n",
Name, Weight, State, Inqueue, Pending, Running, Unknown)
func PrintQueue(queue *v1beta1.Queue, pgStats *podgroup.PodGroupStatistics, writer io.Writer) {
_, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s%-8s\n",
Name, Weight, State, Inqueue, Pending, Running, Unknown, Completed)
if err != nil {
fmt.Printf("Failed to print queue command result: %s.\n", err)
}
_, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d\n",
queue.Name, queue.Spec.Weight, queue.Status.State, queue.Status.Inqueue,
queue.Status.Pending, queue.Status.Running, queue.Status.Unknown)

_, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d%-8d\n",
queue.Name, queue.Spec.Weight, queue.Status.State, pgStats.Inqueue,
pgStats.Pending, pgStats.Running, pgStats.Unknown, pgStats.Completed)
if err != nil {
fmt.Printf("Failed to print queue command result: %s.\n", err)
}
Expand Down
37 changes: 30 additions & 7 deletions pkg/cli/queue/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/apis/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/cli/podgroup"
)

type listFlags struct {
Expand All @@ -53,6 +54,9 @@ const (
// Inqueue status of queue
Inqueue string = "Inqueue"

// Completed status of the queue
Completed string = "Completed"

// State is state of queue
State string = "State"
)
Expand Down Expand Up @@ -81,22 +85,41 @@ func ListQueue(ctx context.Context) error {
fmt.Printf("No resources found\n")
return nil
}
PrintQueues(queues, os.Stdout)

// Although the featuregate called CustomResourceFieldSelectors is enabled by default after v1.31, there are still
// users using k8s versions lower than v1.31. Therefore we can only get all the podgroups from kube-apiserver
// and then filtering them.
pgList, err := jobClient.SchedulingV1beta1().PodGroups("").List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list podgroups with err: %v", err)
}

queueStats := make(map[string]*podgroup.PodGroupStatistics, len(queues.Items))
for _, queue := range queues.Items {
queueStats[queue.Name] = &podgroup.PodGroupStatistics{}
}

for _, pg := range pgList.Items {
queueStats[pg.Spec.Queue].StatPodGroupCountsForQueue(&pg)
}

PrintQueues(queues, queueStats, os.Stdout)

return nil
}

// PrintQueues prints queue information.
func PrintQueues(queues *v1beta1.QueueList, writer io.Writer) {
_, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s\n",
Name, Weight, State, Inqueue, Pending, Running, Unknown)
func PrintQueues(queues *v1beta1.QueueList, queueStats map[string]*podgroup.PodGroupStatistics, writer io.Writer) {
_, err := fmt.Fprintf(writer, "%-25s%-8s%-8s%-8s%-8s%-8s%-8s%-8s\n",
Name, Weight, State, Inqueue, Pending, Running, Unknown, Completed)
if err != nil {
fmt.Printf("Failed to print queue command result: %s.\n", err)
}

for _, queue := range queues.Items {
_, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d\n",
queue.Name, queue.Spec.Weight, queue.Status.State, queue.Status.Inqueue,
queue.Status.Pending, queue.Status.Running, queue.Status.Unknown)
_, err = fmt.Fprintf(writer, "%-25s%-8d%-8s%-8d%-8d%-8d%-8d%-8d\n",
queue.Name, queue.Spec.Weight, queue.Status.State, queueStats[queue.Name].Inqueue, queueStats[queue.Name].Pending,
queueStats[queue.Name].Running, queueStats[queue.Name].Unknown, queueStats[queue.Name].Completed)
if err != nil {
fmt.Printf("Failed to print queue command result: %s.\n", err)
}
Expand Down
93 changes: 93 additions & 0 deletions pkg/controllers/metrics/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"volcano.sh/apis/pkg/apis/scheduling/v1beta1"
"volcano.sh/volcano/pkg/scheduler/metrics"
)

var (
queuePodGroupInqueue = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: metrics.VolcanoNamespace,
Name: "queue_pod_group_inqueue_count",
Help: "The number of Inqueue PodGroup in this queue",
}, []string{"queue_name"},
)

queuePodGroupPending = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: metrics.VolcanoNamespace,
Name: "queue_pod_group_pending_count",
Help: "The number of Pending PodGroup in this queue",
}, []string{"queue_name"},
)

queuePodGroupRunning = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: metrics.VolcanoNamespace,
Name: "queue_pod_group_running_count",
Help: "The number of Running PodGroup in this queue",
}, []string{"queue_name"},
)

queuePodGroupUnknown = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: metrics.VolcanoNamespace,
Name: "queue_pod_group_unknown_count",
Help: "The number of Unknown PodGroup in this queue",
}, []string{"queue_name"},
)

queuePodGroupCompleted = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: metrics.VolcanoNamespace,
Name: "queue_pod_group_completed_count",
Help: "The number of Completed PodGroup in this queue",
}, []string{"queue_name"},
)
)

// UpdateQueuePodGroupInqueueCount records the number of Inqueue PodGroup in this queue
func UpdateQueuePodGroupInqueueCount(queueName string, count int32) {
queuePodGroupInqueue.WithLabelValues(queueName).Set(float64(count))
}

// UpdateQueuePodGroupPendingCount records the number of Pending PodGroup in this queue
func UpdateQueuePodGroupPendingCount(queueName string, count int32) {
queuePodGroupPending.WithLabelValues(queueName).Set(float64(count))
}

// UpdateQueuePodGroupRunningCount records the number of Running PodGroup in this queue
func UpdateQueuePodGroupRunningCount(queueName string, count int32) {
queuePodGroupRunning.WithLabelValues(queueName).Set(float64(count))
}

// UpdateQueuePodGroupUnknownCount records the number of Unknown PodGroup in this queue
func UpdateQueuePodGroupUnknownCount(queueName string, count int32) {
queuePodGroupUnknown.WithLabelValues(queueName).Set(float64(count))
}

// UpdateQueuePodGroupCompletedCount records the number of Completed PodGroup in this queue
func UpdateQueuePodGroupCompletedCount(queueName string, count int32) {
queuePodGroupCompleted.WithLabelValues(queueName).Set(float64(count))
}

// DeleteQueueMetrics delete all metrics related to the queue
func DeleteQueueMetrics(queueName string) {
queuePodGroupInqueue.DeleteLabelValues(queueName)
queuePodGroupPending.DeleteLabelValues(queueName)
queuePodGroupRunning.DeleteLabelValues(queueName)
queuePodGroupUnknown.DeleteLabelValues(queueName)
queuePodGroupCompleted.DeleteLabelValues(queueName)
}

func UpdateQueueMetrics(queueName string, queueStatus *v1beta1.QueueStatus) {
UpdateQueuePodGroupPendingCount(queueName, queueStatus.Pending)
UpdateQueuePodGroupRunningCount(queueName, queueStatus.Running)
UpdateQueuePodGroupUnknownCount(queueName, queueStatus.Unknown)
UpdateQueuePodGroupInqueueCount(queueName, queueStatus.Inqueue)
UpdateQueuePodGroupCompletedCount(queueName, queueStatus.Completed)
}
87 changes: 27 additions & 60 deletions pkg/controllers/queue/queue_controller_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"strings"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -34,7 +33,9 @@ import (
"volcano.sh/apis/pkg/apis/bus/v1alpha1"
busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1"
schedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
v1beta1apply "volcano.sh/apis/pkg/client/applyconfiguration/scheduling/v1beta1"
"volcano.sh/volcano/pkg/controllers/apis"
"volcano.sh/volcano/pkg/controllers/metrics"
"volcano.sh/volcano/pkg/controllers/queue/state"
)

Expand Down Expand Up @@ -83,9 +84,14 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF
queueStatus.Unknown++
case schedulingv1beta1.PodGroupInqueue:
queueStatus.Inqueue++
case schedulingv1beta1.PodGroupCompleted:
queueStatus.Completed++
}
}

// Update the metrics
metrics.UpdateQueueMetrics(queue.Name, &queueStatus)

if updateStateFn != nil {
updateStateFn(&queueStatus, podGroups)
} else {
Expand All @@ -101,13 +107,12 @@ func (c *queuecontroller) syncQueue(queue *schedulingv1beta1.Queue, updateStateF
}

newQueue := queue.DeepCopy()
// ignore update when status does not change
if !equality.Semantic.DeepEqual(queueStatus, queue.Status) {
newQueue.Status = queueStatus
var err error
newQueue, err = c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{})
if err != nil {
klog.Errorf("Failed to update status of Queue %s: %v.", newQueue.Name, err)
// ignore update when state does not change
if queueStatus.State != queue.Status.State {
queueStatusApply := v1beta1apply.QueueStatus().WithState(queueStatus.State).WithAllocated(queueStatus.Allocated)
queueApply := v1beta1apply.Queue(queue.Name).WithStatus(queueStatusApply)
if newQueue, err = c.vcClient.SchedulingV1beta1().Queues().ApplyStatus(context.TODO(), queueApply, metav1.ApplyOptions{FieldManager: controllerName}); err != nil {
klog.Errorf("Update queue state from %s to %s failed for %v", queue.Status.State, queueStatus.State, err)
return err
}
}
Expand All @@ -126,37 +131,19 @@ func (c *queuecontroller) openQueue(queue *schedulingv1beta1.Queue, updateStateF
}

newQueue := queue.DeepCopy()
newQueue.Status.State = schedulingv1beta1.QueueStateOpen
if updateStateFn != nil {
updateStateFn(&newQueue.Status, nil)
}

if queue.Status.State != newQueue.Status.State {
if _, err := c.vcClient.SchedulingV1beta1().Queues().Update(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil {
queueStatusApply := v1beta1apply.QueueStatus().WithState(newQueue.Status.State).WithAllocated(newQueue.Status.Allocated)
queueApply := v1beta1apply.Queue(queue.Name).WithStatus(queueStatusApply)
if _, err := c.vcClient.SchedulingV1beta1().Queues().ApplyStatus(context.TODO(), queueApply, metav1.ApplyOptions{FieldManager: controllerName}); err != nil {
c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.OpenQueueAction),
fmt.Sprintf("Open queue failed for %v", err))
return err
}

c.recorder.Event(newQueue, v1.EventTypeNormal, string(v1alpha1.OpenQueueAction), "Open queue succeed")

q, err := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), newQueue.Name, metav1.GetOptions{})
if err != nil {
fmt.Sprintf("Update queue status from %s to %s failed for %v",
queue.Status.State, newQueue.Status.State, err))
return err
}

newQueue = q.DeepCopy()
if updateStateFn != nil {
updateStateFn(&newQueue.Status, nil)
} else {
return fmt.Errorf("internal error, update state function should be provided")
}

if queue.Status.State != newQueue.Status.State {
if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil {
c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.OpenQueueAction),
fmt.Sprintf("Update queue status from %s to %s failed for %v",
queue.Status.State, newQueue.Status.State, err))
return err
}
}
}

_, err := c.updateQueueAnnotation(queue, ClosedByParentAnnotationKey, ClosedByParentAnnotationFalseValue)
Expand All @@ -173,41 +160,21 @@ func (c *queuecontroller) closeQueue(queue *schedulingv1beta1.Queue, updateState
}
}

podGroups := c.getPodGroups(queue.Name)
newQueue := queue.DeepCopy()
newQueue.Status.State = schedulingv1beta1.QueueStateClosed

if queue.Status.State != newQueue.Status.State {
if _, err := c.vcClient.SchedulingV1beta1().Queues().Update(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil {
c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.CloseQueueAction),
fmt.Sprintf("Close queue failed for %v", err))
return err
}

c.recorder.Event(newQueue, v1.EventTypeNormal, string(v1alpha1.CloseQueueAction), "Close queue succeed")
} else {
return nil
}

q, err := c.vcClient.SchedulingV1beta1().Queues().Get(context.TODO(), newQueue.Name, metav1.GetOptions{})
if err != nil {
return err
}

newQueue = q.DeepCopy()
podGroups := c.getPodGroups(newQueue.Name)
if updateStateFn != nil {
updateStateFn(&newQueue.Status, podGroups)
} else {
return fmt.Errorf("internal error, update state function should be provided")
}

if queue.Status.State != newQueue.Status.State {
if _, err := c.vcClient.SchedulingV1beta1().Queues().UpdateStatus(context.TODO(), newQueue, metav1.UpdateOptions{}); err != nil {
queueStatusApply := v1beta1apply.QueueStatus().WithState(newQueue.Status.State).WithAllocated(newQueue.Status.Allocated)
queueApply := v1beta1apply.Queue(queue.Name).WithStatus(queueStatusApply)
if _, err := c.vcClient.SchedulingV1beta1().Queues().ApplyStatus(context.TODO(), queueApply, metav1.ApplyOptions{FieldManager: controllerName}); err != nil {
c.recorder.Event(newQueue, v1.EventTypeWarning, string(v1alpha1.CloseQueueAction),
fmt.Sprintf("Update queue status from %s to %s failed for %v",
queue.Status.State, newQueue.Status.State, err))
fmt.Sprintf("Close queue failed for %v", err))
return err
}
c.recorder.Event(newQueue, v1.EventTypeNormal, string(v1alpha1.CloseQueueAction), "Close queue succeed")
}

return nil
Expand Down
Loading

0 comments on commit da761e2

Please sign in to comment.