diff --git a/cd/manager/jobmanager/jobManager.go b/cd/manager/jobmanager/jobManager.go index 22c202c..9e9d73b 100644 --- a/cd/manager/jobmanager/jobManager.go +++ b/cd/manager/jobmanager/jobManager.go @@ -262,9 +262,7 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool { func (m *JobManager) processDeployJobs(dequeuedJobs []job.JobState) bool { // Check if there are any (non-anchor) jobs in progress - activeNonAnchorJobs := m.cache.JobsByMatcher(func(js job.JobState) bool { - return job.IsActiveJob(js) && (js.Type != job.JobType_Anchor) - }) + activeNonAnchorJobs := m.getActiveNonAnchorJobs() if len(activeNonAnchorJobs) == 0 { // We know the first job is a deploy, so pick out the component for that job, collapse as many back-to-back jobs // as possible for that component, then run the final job. @@ -394,7 +392,23 @@ func (m *JobManager) processWorkflowJobs(dequeuedJobs []job.JobState) bool { if dequeuedJob.Type == job.JobType_Deploy { break } else if dequeuedJob.Type == job.JobType_Workflow { - dequeuedWorkflows = append(dequeuedWorkflows, dequeuedJob) + if workflow, err := job.CreateWorkflowJob(dequeuedJob); err != nil { + if err = m.updateJobStage(dequeuedJob, job.JobStage_Failed, err); err != nil { + log.Printf("processWorkflowJobs: job update failed: %v, %s", err, manager.PrintJob(dequeuedJob)) + } + } else if workflow.IsType(job.WorkflowJobLabel_Deploy) { + // If we haven't accumulated any workflow jobs yet, this is the first job in the list. If there were + // no other active jobs, queue it and break from the loop because we can't run any other jobs in + // parallel with deployments. + if (len(dequeuedWorkflows) == 0) && (len(m.getActiveNonAnchorJobs()) == 0) { + dequeuedWorkflows = append(dequeuedWorkflows, dequeuedJob) + } + // If there were some accumulated jobs by this point, we know they must be test jobs. We'll still + // break out of the loop here so that we don't run test jobs across deploy jobs. + break + } else { + dequeuedWorkflows = append(dequeuedWorkflows, dequeuedJob) + } } } m.advanceJobs(dequeuedWorkflows) @@ -534,3 +548,9 @@ func (m *JobManager) getActiveDeploys() []job.JobState { return false }) } + +func (m *JobManager) getActiveNonAnchorJobs() []job.JobState { + return m.cache.JobsByMatcher(func(js job.JobState) bool { + return job.IsActiveJob(js) && (js.Type != job.JobType_Anchor) + }) +}