Skip to content

Commit

Permalink
fix: serialize workflow test/deploy jobs (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 authored Sep 6, 2024
1 parent 8947f15 commit 0a254f4
Showing 1 changed file with 24 additions and 4 deletions.
28 changes: 24 additions & 4 deletions cd/manager/jobmanager/jobManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool {

func (m *JobManager) processDeployJobs(dequeuedJobs []job.JobState) bool {
// Check if there are any (non-anchor) jobs in progress
activeNonAnchorJobs := m.cache.JobsByMatcher(func(js job.JobState) bool {
return job.IsActiveJob(js) && (js.Type != job.JobType_Anchor)
})
activeNonAnchorJobs := m.getActiveNonAnchorJobs()
if len(activeNonAnchorJobs) == 0 {
// We know the first job is a deploy, so pick out the component for that job, collapse as many back-to-back jobs
// as possible for that component, then run the final job.
Expand Down Expand Up @@ -394,7 +392,23 @@ func (m *JobManager) processWorkflowJobs(dequeuedJobs []job.JobState) bool {
if dequeuedJob.Type == job.JobType_Deploy {
break
} else if dequeuedJob.Type == job.JobType_Workflow {
dequeuedWorkflows = append(dequeuedWorkflows, dequeuedJob)
if workflow, err := job.CreateWorkflowJob(dequeuedJob); err != nil {
if err = m.updateJobStage(dequeuedJob, job.JobStage_Failed, err); err != nil {
log.Printf("processWorkflowJobs: job update failed: %v, %s", err, manager.PrintJob(dequeuedJob))
}
} else if workflow.IsType(job.WorkflowJobLabel_Deploy) {
// If we haven't accumulated any workflow jobs yet, this is the first job in the list. If there were
// no other active jobs, queue it and break from the loop because we can't run any other jobs in
// parallel with deployments.
if (len(dequeuedWorkflows) == 0) && (len(m.getActiveNonAnchorJobs()) == 0) {
dequeuedWorkflows = append(dequeuedWorkflows, dequeuedJob)
}
// If there were some accumulated jobs by this point, we know they must be test jobs. We'll still
// break out of the loop here so that we don't run test jobs across deploy jobs.
break
} else {
dequeuedWorkflows = append(dequeuedWorkflows, dequeuedJob)
}
}
}
m.advanceJobs(dequeuedWorkflows)
Expand Down Expand Up @@ -534,3 +548,9 @@ func (m *JobManager) getActiveDeploys() []job.JobState {
return false
})
}

func (m *JobManager) getActiveNonAnchorJobs() []job.JobState {
return m.cache.JobsByMatcher(func(js job.JobState) bool {
return job.IsActiveJob(js) && (js.Type != job.JobType_Anchor)
})
}

0 comments on commit 0a254f4

Please sign in to comment.