Skip to content

Commit

Permalink
Making executor resource metrics consistent with server (#550)
Browse files Browse the repository at this point in the history
* Making executor resource metrics consistent with server

Instead of having metrics for each resource type (cpu, memory..) will now have armada_executor_job_pod_resource_request and job_pod_resource_usage

Which contains resource per queue, with resourceType as a field on each metric

This makes it:
 - Consistent with our server metrics
 - More flexible, automatically including new resources that are requested/used. Meaning we don't need to add new metrics each time a new resource type is added

* Swap node resource to be more reusable

* Convert ClusterContextMetrics to prometheus.Collector

* Move loading to top
  • Loading branch information
JamesMurkin authored Apr 22, 2021
1 parent 1d5f6eb commit 4d0379a
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 118 deletions.
1 change: 0 additions & 1 deletion config/executor/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ task:
jobLeaseRenewalInterval: 15s
podDeletionInterval: 5s
allocateSpareClusterCapacityInterval: 5s
podMetricsInterval: 1s
queueUsageDataRefreshInterval: 5s
utilisationEventProcessingInterval: 1s
utilisationEventReportingInterval: 5m
Expand Down
3 changes: 1 addition & 2 deletions internal/executor/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,13 @@ func StartUpWithContext(config configuration.ExecutorConfiguration, clusterConte
jobLeaseService,
clusterUtilisationService)

contextMetrics := pod_metrics.NewClusterContextMetrics(clusterContext, clusterUtilisationService, queueUtilisationService)
pod_metrics.ExposeClusterContextMetrics(clusterContext, clusterUtilisationService, queueUtilisationService)

taskManager.Register(clusterUtilisationService.ReportClusterUtilisation, config.Task.UtilisationReportingInterval, "utilisation_reporting")
taskManager.Register(clusterAllocationService.AllocateSpareClusterCapacity, config.Task.AllocateSpareClusterCapacityInterval, "job_lease_request")
taskManager.Register(jobLeaseService.ManageJobLeases, config.Task.JobLeaseRenewalInterval, "job_lease_renewal")
taskManager.Register(eventReporter.ReportMissingJobEvents, config.Task.MissingJobEventReconciliationInterval, "event_reconciliation")
taskManager.Register(stuckPodDetector.HandleStuckPods, config.Task.StuckPodScanInterval, "stuck_pod")
taskManager.Register(contextMetrics.UpdateMetrics, config.Task.PodMetricsInterval, "pod_metrics")

if config.Metric.ExposeQueueUsageMetrics {
taskManager.Register(queueUtilisationService.RefreshUtilisationData, config.Task.QueueUsageDataRefreshInterval, "pod_usage_data_refresh")
Expand Down
1 change: 0 additions & 1 deletion internal/executor/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type TaskConfiguration struct {
AllocateSpareClusterCapacityInterval time.Duration
StuckPodScanInterval time.Duration
PodDeletionInterval time.Duration
PodMetricsInterval time.Duration
QueueUsageDataRefreshInterval time.Duration
UtilisationEventProcessingInterval time.Duration
UtilisationEventReportingInterval time.Duration
Expand Down
230 changes: 116 additions & 114 deletions internal/executor/metrics/pod_metrics/cluster_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/client-go/tools/cache"

"github.com/G-Research/armada/internal/common"
Expand All @@ -15,33 +16,58 @@ import (
)

const (
leasedPhase = "Leased"
queueLabel = "queue"
phaseLabel = "phase"
leasedPhase = "Leased"
queueLabel = "queue"
phaseLabel = "phase"
resourceTypeLabel = "resourceType"
)

var podCountDesc = prometheus.NewDesc(
metrics.ArmadaExecutorMetricsPrefix+"job_pod",
"Pods in different phases by queue",
[]string{queueLabel, phaseLabel}, nil,
)

var podResourceRequestDesc = prometheus.NewDesc(
metrics.ArmadaExecutorMetricsPrefix+"job_pod_resource_request",
"Pod resource requests in different phases by queue",
[]string{queueLabel, phaseLabel, resourceTypeLabel}, nil,
)

var podResourceUsageDesc = prometheus.NewDesc(
metrics.ArmadaExecutorMetricsPrefix+"job_pod_resource_usage",
"Pod resource usage in different phases by queue",
[]string{queueLabel, phaseLabel, resourceTypeLabel}, nil,
)

var nodeCountDesc = prometheus.NewDesc(
metrics.ArmadaExecutorMetricsPrefix+"available_node_count",
"Number of nodes available for Armada jobs",
nil, nil,
)

var nodeAvailableResourceDesc = prometheus.NewDesc(
metrics.ArmadaExecutorMetricsPrefix+"available_node_resource_allocatable",
"Resource allocatable on nodes available for Armada jobs",
[]string{resourceTypeLabel}, nil,
)

var nodeTotalResourceDesc = prometheus.NewDesc(
metrics.ArmadaExecutorMetricsPrefix+"available_node_resource_total",
"Total resource on nodes available for Armada jobs",
[]string{resourceTypeLabel}, nil,
)

type ClusterContextMetrics struct {
context context.ClusterContext
utilisationService service.UtilisationService
queueUtilisationService service.PodUtilisationService

knownQueues map[string]bool

podCountTotal *prometheus.CounterVec
podCount *prometheus.GaugeVec
podCpuRequest *prometheus.GaugeVec
podCpuUsage *prometheus.GaugeVec
podMemoryRequest *prometheus.GaugeVec
podMemoryUsage *prometheus.GaugeVec

nodeCount prometheus.Gauge
nodeCpuAvailable prometheus.Gauge
nodeCpuTotal prometheus.Gauge
nodeMemoryAvailable prometheus.Gauge
nodeMemoryTotal prometheus.Gauge
knownQueues map[string]bool
podCountTotal *prometheus.CounterVec
}

func NewClusterContextMetrics(context context.ClusterContext, utilisationService service.UtilisationService, queueUtilisationService service.PodUtilisationService) *ClusterContextMetrics {
func ExposeClusterContextMetrics(context context.ClusterContext, utilisationService service.UtilisationService, queueUtilisationService service.PodUtilisationService) *ClusterContextMetrics {
m := &ClusterContextMetrics{
context: context,
utilisationService: utilisationService,
Expand All @@ -53,62 +79,6 @@ func NewClusterContextMetrics(context context.ClusterContext, utilisationService
Help: "Counter for pods in different phases by queue",
},
[]string{queueLabel, phaseLabel}),

podCount: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: metrics.ArmadaExecutorMetricsPrefix + "job_pod",
Help: "Pods in different phases by queue",
},
[]string{queueLabel, phaseLabel}),
podCpuRequest: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: metrics.ArmadaExecutorMetricsPrefix + "job_pod_cpu_request",
Help: "Pod cpu requests in different phases by queue",
},
[]string{queueLabel, phaseLabel}),
podCpuUsage: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: metrics.ArmadaExecutorMetricsPrefix + "job_pod_cpu_usage",
Help: "Pod cpu usage in different phases by queue",
},
[]string{queueLabel, phaseLabel}),
podMemoryRequest: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: metrics.ArmadaExecutorMetricsPrefix + "job_pod_memory_request_bytes",
Help: "Pod memory requests in different phases by queue",
},
[]string{queueLabel, phaseLabel}),
podMemoryUsage: promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: metrics.ArmadaExecutorMetricsPrefix + "job_pod_memory_usage_bytes",
Help: "Pod memory usage in different phases by queue",
},
[]string{queueLabel, phaseLabel}),
nodeCount: promauto.NewGauge(
prometheus.GaugeOpts{
Name: metrics.ArmadaExecutorMetricsPrefix + "available_node_count",
Help: "Number of nodes available for Armada jobs",
}),
nodeCpuAvailable: promauto.NewGauge(
prometheus.GaugeOpts{
Name: metrics.ArmadaExecutorMetricsPrefix + "available_node_allocatable_cpu",
Help: "Number of cpus available for Armada jobs",
}),
nodeCpuTotal: promauto.NewGauge(
prometheus.GaugeOpts{
Name: metrics.ArmadaExecutorMetricsPrefix + "available_node_total_cpu",
Help: "Number of cpus on nodes available for Armada jobs",
}),
nodeMemoryAvailable: promauto.NewGauge(
prometheus.GaugeOpts{
Name: metrics.ArmadaExecutorMetricsPrefix + "available_node_allocatable_memory_bytes",
Help: "Memory available for Armada jobs",
}),
nodeMemoryTotal: promauto.NewGauge(
prometheus.GaugeOpts{
Name: metrics.ArmadaExecutorMetricsPrefix + "available_node_total_memory_bytes",
Help: "Memory on nodes available for Armada jobs",
}),
}

context.AddPodEventHandler(cache.ResourceEventHandlerFuncs{
Expand All @@ -128,6 +98,7 @@ func NewClusterContextMetrics(context context.ClusterContext, utilisationService
m.reportPhase(newPod)
},
})
prometheus.MustRegister(m)
return m
}

Expand All @@ -140,17 +111,40 @@ func (m *ClusterContextMetrics) reportPhase(pod *v1.Pod) {
}

type podMetric struct {
cpuRequest float64
cpuUsage float64
memoryRequest float64
memoryUsage float64
count float64
resourceRequest common.ComputeResources
resourceUsage common.ComputeResources
count float64
}

func (m *ClusterContextMetrics) UpdateMetrics() {
func (m *ClusterContextMetrics) Describe(desc chan<- *prometheus.Desc) {
desc <- podCountDesc
desc <- podResourceRequestDesc
desc <- podResourceUsageDesc
desc <- nodeCountDesc
desc <- nodeAvailableResourceDesc
desc <- nodeTotalResourceDesc
}

func (m *ClusterContextMetrics) Collect(metrics chan<- prometheus.Metric) {
pods, e := m.context.GetBatchPods()
if e != nil {
log.Errorf("Unable to update metrics: %v", e)
log.Errorf("Unable to get batch pods to calculate pod metrics because: %v", e)
recordInvalidMetrics(metrics, e)
return
}

allAvailableProcessingNodes, err := m.utilisationService.GetAllAvailableProcessingNodes()
if err != nil {
log.Errorf("Failed to get required information to calculate node metrics because %s", err)
recordInvalidMetrics(metrics, e)
return
}

allocatableNodeResource, err := m.utilisationService.GetTotalAllocatableClusterCapacity()
if err != nil {
log.Errorf("Failed to get required information to calculate node metrics because %s", err)
recordInvalidMetrics(metrics, e)
return
}

podMetrics := map[string]map[string]*podMetric{}
Expand All @@ -172,14 +166,12 @@ func (m *ClusterContextMetrics) UpdateMetrics() {
podMetrics[queue] = queueMetric
}

request := common.TotalPodResourceRequest(&pod.Spec).AsFloat()
usage := m.queueUtilisationService.GetPodUtilisation(pod).AsFloat()
request := common.TotalPodResourceRequest(&pod.Spec)
usage := m.queueUtilisationService.GetPodUtilisation(pod)

queueMetric[phase].count++
queueMetric[phase].memoryRequest += request[string(v1.ResourceMemory)]
queueMetric[phase].memoryUsage += usage[string(v1.ResourceMemory)]
queueMetric[phase].cpuRequest += request[string(v1.ResourceCPU)]
queueMetric[phase].cpuUsage += usage[string(v1.ResourceCPU)]
queueMetric[phase].resourceRequest.Add(request)
queueMetric[phase].resourceUsage.Add(usage)
}

// reset metric for queues without pods
Expand All @@ -194,42 +186,52 @@ func (m *ClusterContextMetrics) UpdateMetrics() {
m.knownQueues[queue] = true

for phase, phaseMetric := range queueMetric {
m.podCount.WithLabelValues(queue, phase).Set(phaseMetric.count)
m.podCpuRequest.WithLabelValues(queue, phase).Set(phaseMetric.cpuRequest)
m.podCpuUsage.WithLabelValues(queue, phase).Set(phaseMetric.cpuUsage)
m.podMemoryRequest.WithLabelValues(queue, phase).Set(phaseMetric.memoryRequest)
m.podMemoryUsage.WithLabelValues(queue, phase).Set(phaseMetric.memoryUsage)
for resourceType, request := range phaseMetric.resourceRequest {
metrics <- prometheus.MustNewConstMetric(podResourceRequestDesc, prometheus.GaugeValue,
common.QuantityAsFloat64(request), queue, phase, resourceType)
}
for resourceType, usage := range phaseMetric.resourceUsage {
metrics <- prometheus.MustNewConstMetric(podResourceUsageDesc, prometheus.GaugeValue,
common.QuantityAsFloat64(usage), queue, phase, resourceType)
}
metrics <- prometheus.MustNewConstMetric(podCountDesc, prometheus.GaugeValue, phaseMetric.count, queue, phase)
}
}

allAvailableProcessingNodes, err := m.utilisationService.GetAllAvailableProcessingNodes()
if err != nil {
log.Errorf("Failed to get required information to report cluster usage because %s", err)
return
availableNodeResource := *allocatableNodeResource
totalNodeResource := common.CalculateTotalResource(allAvailableProcessingNodes)

metrics <- prometheus.MustNewConstMetric(nodeCountDesc, prometheus.GaugeValue, float64(len(allAvailableProcessingNodes)))
for resourceType, allocatable := range availableNodeResource {
metrics <- prometheus.MustNewConstMetric(nodeAvailableResourceDesc, prometheus.GaugeValue, common.QuantityAsFloat64(allocatable), resourceType)
}

allocatableNodeResource, err := m.utilisationService.GetTotalAllocatableClusterCapacity()
if err != nil {
log.Errorf("Failed to get required information to report cluster usage because %s", err)
return
for resourceType, total := range totalNodeResource {
metrics <- prometheus.MustNewConstMetric(nodeTotalResourceDesc, prometheus.GaugeValue, common.QuantityAsFloat64(total), resourceType)
}
totalNodeResource := common.CalculateTotalResource(allAvailableProcessingNodes).AsFloat()
availableNodeResource := allocatableNodeResource.AsFloat()

m.nodeCount.Set(float64(len(allAvailableProcessingNodes)))
m.nodeCpuAvailable.Set(availableNodeResource[string(v1.ResourceCPU)])
m.nodeCpuTotal.Set(totalNodeResource[string(v1.ResourceCPU)])
m.nodeMemoryAvailable.Set(availableNodeResource[string(v1.ResourceMemory)])
m.nodeMemoryTotal.Set(totalNodeResource[string(v1.ResourceMemory)])
}

func createPodPhaseMetric() map[string]*podMetric {
zeroComputeResource := common.ComputeResources{
"cpu": resource.MustParse("0"),
"memory": resource.MustParse("0"),
"ephemeral-storage": resource.MustParse("0"),
}
return map[string]*podMetric{
leasedPhase: {},
string(v1.PodPending): {},
string(v1.PodRunning): {},
string(v1.PodSucceeded): {},
string(v1.PodFailed): {},
string(v1.PodUnknown): {},
leasedPhase: {resourceRequest: zeroComputeResource.DeepCopy(), resourceUsage: zeroComputeResource.DeepCopy()},
string(v1.PodPending): {resourceRequest: zeroComputeResource.DeepCopy(), resourceUsage: zeroComputeResource.DeepCopy()},
string(v1.PodRunning): {resourceRequest: zeroComputeResource.DeepCopy(), resourceUsage: zeroComputeResource.DeepCopy()},
string(v1.PodSucceeded): {resourceRequest: zeroComputeResource.DeepCopy(), resourceUsage: zeroComputeResource.DeepCopy()},
string(v1.PodFailed): {resourceRequest: zeroComputeResource.DeepCopy(), resourceUsage: zeroComputeResource.DeepCopy()},
string(v1.PodUnknown): {resourceRequest: zeroComputeResource.DeepCopy(), resourceUsage: zeroComputeResource.DeepCopy()},
}
}

func recordInvalidMetrics(metrics chan<- prometheus.Metric, e error) {
metrics <- prometheus.NewInvalidMetric(podCountDesc, e)
metrics <- prometheus.NewInvalidMetric(podResourceRequestDesc, e)
metrics <- prometheus.NewInvalidMetric(podResourceUsageDesc, e)
metrics <- prometheus.NewInvalidMetric(nodeCountDesc, e)
metrics <- prometheus.NewInvalidMetric(nodeAvailableResourceDesc, e)
metrics <- prometheus.NewInvalidMetric(nodeTotalResourceDesc, e)
}

0 comments on commit 4d0379a

Please sign in to comment.