Skip to content

Commit

Permalink
Add job queue cache tracking where individual jobs can be scheduled. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jankaspar authored Mar 9, 2021
1 parent 88d607a commit c06c6d7
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 88 deletions.
155 changes: 155 additions & 0 deletions internal/armada/cache/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package cache

import (
"sync"

log "github.com/sirupsen/logrus"

"github.com/G-Research/armada/internal/armada/repository"
"github.com/G-Research/armada/internal/armada/scheduling"
"github.com/G-Research/armada/internal/common"
"github.com/G-Research/armada/pkg/api"
)

type empty struct{}
type stringSet map[string]empty

type QueueCache struct {
queueRepository repository.QueueRepository
jobRepository repository.JobRepository
schedulingInfoRepository repository.SchedulingInfoRepository

refreshMutex sync.Mutex
queuedResources map[string]map[string]common.ComputeResourcesFloat
queueNonMatchingJobIds map[string]map[string]stringSet
}

func NewQueueCache(
queueRepository repository.QueueRepository,
jobRepository repository.JobRepository,
schedulingInfoRepository repository.SchedulingInfoRepository,
) *QueueCache {
collector := &QueueCache{
queueRepository: queueRepository,
jobRepository: jobRepository,
schedulingInfoRepository: schedulingInfoRepository,
queuedResources: map[string]map[string]common.ComputeResourcesFloat{},
queueNonMatchingJobIds: map[string]map[string]stringSet{}}

return collector
}

func (c *QueueCache) Refresh() {
queues, e := c.queueRepository.GetAllQueues()
if e != nil {
log.Errorf("Error while getting queues %s", e)
return
}

clusterInfo, e := c.schedulingInfoRepository.GetClusterSchedulingInfo()
if e != nil {
log.Errorf("Error while getting cluster reports %s", e)
return
}

activeClusterInfo := scheduling.FilterActiveClusterSchedulingInfoReports(clusterInfo)
clusterInfoByPool := scheduling.GroupSchedulingInfoByPool(activeClusterInfo)

for _, queue := range queues {
resourceUsageByPool := map[string]common.ComputeResources{}
nonMatchingJobs := map[string]stringSet{}

err := c.jobRepository.IterateQueueJobs(queue.Name, func(job *api.Job) {
jobResources := common.TotalJobResourceRequest(job)
nonMatchingClusters := stringSet{}

for pool, infos := range clusterInfoByPool {
matches := false
for _, schedulingInfo := range infos {
if scheduling.MatchSchedulingRequirements(job, schedulingInfo) {
matches = true
} else {
nonMatchingClusters[schedulingInfo.ClusterId] = empty{}
}
}

if matches {
r, exists := resourceUsageByPool[pool]
if !exists {
r = common.ComputeResources{}
resourceUsageByPool[pool] = r
}
r.Add(jobResources)
}
}
nonMatchingJobs[job.Id] = nonMatchingClusters
})

if err != nil {
log.Errorf("Error while getting queue %s resources %s", queue.Name, err)
}

c.updateQueuedNonMatchingJobs(queue.Name, nonMatchingJobs)
c.updateQueuedResource(queue.Name, resourceUsageByPool)
}
}

func (c *QueueCache) updateQueuedResource(queueName string, resourcesByPool map[string]common.ComputeResources) {
c.refreshMutex.Lock()
defer c.refreshMutex.Unlock()
floatResourcesByPool := map[string]common.ComputeResourcesFloat{}
for pool, res := range resourcesByPool {
floatResourcesByPool[pool] = res.AsFloat()
}
c.queuedResources[queueName] = floatResourcesByPool
}

func (c *QueueCache) updateQueuedNonMatchingJobs(queueName string, nonMatchingClustersById map[string]stringSet) {
c.refreshMutex.Lock()
defer c.refreshMutex.Unlock()
c.queueNonMatchingJobIds[queueName] = nonMatchingClustersById
}

func (c *QueueCache) GetQueuedResources(queueName string) map[string]common.ComputeResourcesFloat {
c.refreshMutex.Lock()
defer c.refreshMutex.Unlock()
return c.queuedResources[queueName]
}

func (c *QueueCache) getNonSchedulableJobIds(queueName string) map[string]stringSet {
c.refreshMutex.Lock()
defer c.refreshMutex.Unlock()
return c.queueNonMatchingJobIds[queueName]
}

func (c *QueueCache) PeekClusterQueue(clusterId, queue string, limit int64) ([]*api.Job, error) {
ids, e := c.jobRepository.GetQueueJobIds(queue)
if e != nil {
return nil, e
}
nonMatchingJobs := c.getNonSchedulableJobIds(queue)

filtered := []string{}
for _, id := range ids {
if matches(nonMatchingJobs, clusterId, id) {
filtered = append(filtered, id)
}
if len(filtered) == int(limit) {
break
}
}
return c.jobRepository.GetExistingJobsByIds(filtered)
}

func matches(nonMatchingJobs map[string]stringSet, clusterId, jobId string) bool {
nonMatchingClusters, ok := nonMatchingJobs[jobId]
if !ok {
return true
}
_, exists := nonMatchingClusters[clusterId]
return !exists
}

func (c *QueueCache) TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error) {
return c.jobRepository.TryLeaseJobs(clusterId, queue, jobs)
}
71 changes: 8 additions & 63 deletions internal/armada/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,33 @@
package metrics

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"

"github.com/G-Research/armada/internal/armada/repository"
"github.com/G-Research/armada/internal/armada/scheduling"
"github.com/G-Research/armada/internal/common"
"github.com/G-Research/armada/pkg/api"
)

const MetricPrefix = "armada_"

type QueueMetricProvider interface {
GetQueuedResources(queueName string) map[string]common.ComputeResourcesFloat
}

func ExposeDataMetrics(
queueRepository repository.QueueRepository,
jobRepository repository.JobRepository,
usageRepository repository.UsageRepository,
schedulingInfoRepository repository.SchedulingInfoRepository,
queueMetrics QueueMetricProvider,
) *QueueInfoCollector {
collector := &QueueInfoCollector{
queueRepository: queueRepository,
jobRepository: jobRepository,
usageRepository: usageRepository,
schedulingInfoRepository: schedulingInfoRepository,
queuedResources: map[string]map[string]common.ComputeResourcesFloat{}}
queueMetrics: queueMetrics}
prometheus.MustRegister(collector)
return collector
}
Expand All @@ -35,9 +37,7 @@ type QueueInfoCollector struct {
jobRepository repository.JobRepository
usageRepository repository.UsageRepository
schedulingInfoRepository repository.SchedulingInfoRepository

refreshMutex sync.Mutex
queuedResources map[string]map[string]common.ComputeResourcesFloat
queueMetrics QueueMetricProvider
}

var queueSizeDesc = prometheus.NewDesc(
Expand Down Expand Up @@ -89,61 +89,6 @@ var clusterAvailableCapacity = prometheus.NewDesc(
nil,
)

func (c *QueueInfoCollector) RefreshMetrics() {
queues, e := c.queueRepository.GetAllQueues()
if e != nil {
log.Errorf("Error while getting queue metrics %s", e)
return
}

clusterInfo, e := c.schedulingInfoRepository.GetClusterSchedulingInfo()
if e != nil {
log.Errorf("Error while getting cluster reports %s", e)
return
}

activeClusterInfo := scheduling.FilterActiveClusterSchedulingInfoReports(clusterInfo)
clusterInfoByPool := scheduling.GroupSchedulingInfoByPool(activeClusterInfo)

for _, queue := range queues {
resourceUsageByPool := map[string]common.ComputeResources{}

err := c.jobRepository.IterateQueueJobs(queue.Name, func(job *api.Job) {
jobResources := common.TotalJobResourceRequest(job)
for pool, info := range clusterInfoByPool {
if scheduling.MatchSchedulingRequirementsOnAnyCluster(job, info) {
r, exists := resourceUsageByPool[pool]
if !exists {
r = common.ComputeResources{}
resourceUsageByPool[pool] = r
}
r.Add(jobResources)
}
}
})
if err != nil {
log.Errorf("Error while getting queue %s resources %s", queue.Name, err)
}
c.updateQueuedResource(queue.Name, resourceUsageByPool)
}
}

func (c *QueueInfoCollector) updateQueuedResource(queueName string, resourcesByPool map[string]common.ComputeResources) {
c.refreshMutex.Lock()
defer c.refreshMutex.Unlock()
floatResourcesByPool := map[string]common.ComputeResourcesFloat{}
for pool, res := range resourcesByPool {
floatResourcesByPool[pool] = res.AsFloat()
}
c.queuedResources[queueName] = floatResourcesByPool
}

func (c *QueueInfoCollector) GetQueueResources(queueName string) map[string]common.ComputeResourcesFloat {
c.refreshMutex.Lock()
defer c.refreshMutex.Unlock()
return c.queuedResources[queueName]
}

func (c *QueueInfoCollector) Describe(desc chan<- *prometheus.Desc) {
desc <- queueSizeDesc
desc <- queuePriorityDesc
Expand Down Expand Up @@ -197,7 +142,7 @@ func (c *QueueInfoCollector) Collect(metrics chan<- prometheus.Metric) {
}

for _, q := range queues {
for pool, poolResources := range c.GetQueueResources(q.Name) {
for pool, poolResources := range c.queueMetrics.GetQueuedResources(q.Name) {
for resourceType, amount := range poolResources {
metrics <- prometheus.MustNewConstMetric(queueResourcesDesc, prometheus.GaugeValue, amount, pool, q.Name, resourceType)
}
Expand Down
14 changes: 8 additions & 6 deletions internal/armada/repository/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,16 @@ const jobRetriesPrefix = "Job:Retries:"

const queueResourcesBatchSize = 20000

type JobQueueRepository interface {
type JobRepository interface {
PeekQueue(queue string, limit int64) ([]*api.Job, error)
TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error)
}

type JobRepository interface {
JobQueueRepository
CreateJobs(request *api.JobSubmitRequest, principal authorization.Principal) ([]*api.Job, error)
AddJobs(job []*api.Job) ([]*SubmitJobResult, error)
GetExistingJobsByIds(ids []string) ([]*api.Job, error)
FilterActiveQueues(queues []*api.Queue) ([]*api.Queue, error)
GetQueueSizes(queues []*api.Queue) (sizes []int64, e error)
IterateQueueJobs(queueName string, action func(*api.Job)) error
GetQueueJobIds(queueName string) ([]string, error)
RenewLease(clusterId string, jobIds []string) (renewed []string, e error)
ExpireLeases(queue string, deadline time.Time) (expired []*api.Job, e error)
ReturnLease(clusterId string, jobId string) (returnedJob *api.Job, err error)
Expand Down Expand Up @@ -439,7 +436,7 @@ func (repo *RedisJobRepository) GetQueueSizes(queues []*api.Queue) (sizes []int6
}

func (repo *RedisJobRepository) IterateQueueJobs(queueName string, action func(*api.Job)) error {
queuedIds, e := repo.db.ZRange(jobQueuePrefix+queueName, 0, -1).Result()
queuedIds, e := repo.GetQueueJobIds(queueName)
if e != nil {
return e
}
Expand All @@ -462,6 +459,11 @@ func (repo *RedisJobRepository) IterateQueueJobs(queueName string, action func(*
return nil
}

func (repo *RedisJobRepository) GetQueueJobIds(queueName string) ([]string, error) {
queuedIds, e := repo.db.ZRange(jobQueuePrefix+queueName, 0, -1).Result()
return queuedIds, e
}

func (repo *RedisJobRepository) GetActiveJobIds(queue string, jobSetId string) ([]string, error) {

queuedIds, e := repo.db.ZRange(jobQueuePrefix+queue, 0, -1).Result()
Expand Down
16 changes: 10 additions & 6 deletions internal/armada/scheduling/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,20 @@ import (
"k8s.io/apimachinery/pkg/api/resource"

"github.com/G-Research/armada/internal/armada/configuration"
"github.com/G-Research/armada/internal/armada/repository"
"github.com/G-Research/armada/internal/common"
"github.com/G-Research/armada/pkg/api"
)

const maxJobsPerLease = 10000

type JobQueue interface {
PeekClusterQueue(clusterId, queue string, limit int64) ([]*api.Job, error)
TryLeaseJobs(clusterId string, queue string, jobs []*api.Job) ([]*api.Job, error)
}

type leaseContext struct {
schedulingConfig *configuration.SchedulingConfig
repository repository.JobQueueRepository
queue JobQueue
onJobsLeased func([]*api.Job)

ctx context.Context
Expand All @@ -37,7 +41,7 @@ type leaseContext struct {

func LeaseJobs(ctx context.Context,
config *configuration.SchedulingConfig,
jobQueueRepository repository.JobQueueRepository,
jobQueue JobQueue,
onJobLease func([]*api.Job),
request *api.LeaseRequest,
nodeResources []*nodeTypeAllocation,
Expand Down Expand Up @@ -73,7 +77,7 @@ func LeaseJobs(ctx context.Context,

lc := &leaseContext{
schedulingConfig: config,
repository: jobQueueRepository,
queue: jobQueue,

ctx: ctx,
clusterId: request.ClusterId,
Expand Down Expand Up @@ -234,7 +238,7 @@ func (c *leaseContext) leaseJobs(queue *api.Queue, slice common.ComputeResources

topJobs, ok := c.queueCache[queue.Name]
if !ok || len(topJobs) < int(c.schedulingConfig.QueueLeaseBatchSize/2) {
newTop, e := c.repository.PeekQueue(queue.Name, int64(c.schedulingConfig.QueueLeaseBatchSize))
newTop, e := c.queue.PeekClusterQueue(c.clusterId, queue.Name, int64(c.schedulingConfig.QueueLeaseBatchSize))
if e != nil {
return nil, slice, e
}
Expand Down Expand Up @@ -265,7 +269,7 @@ func (c *leaseContext) leaseJobs(queue *api.Queue, slice common.ComputeResources
}
c.queueCache[queue.Name] = removeJobs(c.queueCache[queue.Name], candidates)

leased, e := c.repository.TryLeaseJobs(c.clusterId, queue.Name, candidates)
leased, e := c.queue.TryLeaseJobs(c.clusterId, queue.Name, candidates)
if e != nil {
return nil, slice, e
}
Expand Down
Loading

0 comments on commit c06c6d7

Please sign in to comment.