diff --git a/cd/manager/common/aws/dynamoDb.go b/cd/manager/common/aws/dynamoDb.go index a125cc6..90881ea 100644 --- a/cd/manager/common/aws/dynamoDb.go +++ b/cd/manager/common/aws/dynamoDb.go @@ -235,6 +235,8 @@ func (db DynamoDb) InitializeJobs() error { return err } else if err = db.loadJobs(manager.JobStage_Started, ttlCursor); err != nil { return err + } else if err = db.loadJobs(manager.JobStage_Dequeued, ttlCursor); err != nil { + return err } else { return db.loadJobs(manager.JobStage_Skipped, ttlCursor) } @@ -254,12 +256,12 @@ func (db DynamoDb) QueueJob(jobState manager.JobState) error { // just a hash-map from job IDs to job state for ACTIVE jobs (jobs are not added to the cache until they are in // progress). This also means that we don't need to write jobs to the database if they're already in the cache. if _, found := db.cache.JobById(jobState.Job); !found { - return db.writeJob(jobState) + return db.WriteJob(jobState) } return nil } -func (db DynamoDb) DequeueJobs() []manager.JobState { +func (db DynamoDb) GetQueuedJobs() []manager.JobState { // If available, use the timestamp of the previously found first job not already in processing as the start of the // current database search. We can't know for sure that all subsequent jobs are unprocessed (e.g. force deploys or // anchors could mess up that assumption), but what we can say for sure is that all prior jobs have at least entered @@ -287,7 +289,7 @@ func (db DynamoDb) DequeueJobs() []manager.JobState { // Return true so that we keep on iterating. return true }); err != nil { - log.Printf("dequeueJobs: failed iteration through jobs: %v", err) + log.Printf("queuedJobs: failed iteration through jobs: %v", err) } // If the cursor is still unset, then we found no jobs that weren't already in processing or done. In that case, set // the cursor to "now" so we know to search from this point in time onwards. There's no point looking up jobs from @@ -298,6 +300,21 @@ func (db DynamoDb) DequeueJobs() []manager.JobState { return jobs } +func (db DynamoDb) GetDequeuedJobs() []manager.JobState { + jobs := make([]manager.JobState, 0, 0) + if err := db.iterateByStage(manager.JobStage_Dequeued, db.cursor, true, func(jobState manager.JobState) bool { + // Append the job if it's in the cache but hasn't been started yet + if cachedJob, found := db.cache.JobById(jobState.Job); found && cachedJob.Stage == manager.JobStage_Dequeued { + jobs = append(jobs, jobState) + } + // Return true so that we keep on iterating. + return true + }); err != nil { + log.Printf("dequeuedJobs: failed iteration through jobs: %v", err) + } + return jobs +} + func (db DynamoDb) IterateByType(jobType manager.JobType, asc bool, iter func(manager.JobState) bool) error { return db.iterateByType(jobType, time.Now().AddDate(0, 0, -manager.DefaultTtlDays), asc, iter) } @@ -395,15 +412,15 @@ func (db DynamoDb) iterateEvents(queryInput *dynamodb.QueryInput, iter func(mana return nil } -func (db DynamoDb) WriteJob(jobState manager.JobState) error { - if err := db.writeJob(jobState); err != nil { +func (db DynamoDb) AdvanceJob(jobState manager.JobState) error { + if err := db.WriteJob(jobState); err != nil { return err } db.cache.WriteJob(jobState) return nil } -func (db DynamoDb) writeJob(jobState manager.JobState) error { +func (db DynamoDb) WriteJob(jobState manager.JobState) error { // Generate a new UUID for every job update jobState.Id = uuid.New().String() // Set entry expiration diff --git a/cd/manager/jobmanager/jobManager.go b/cd/manager/jobmanager/jobManager.go index be16b65..853f9bf 100644 --- a/cd/manager/jobmanager/jobManager.go +++ b/cd/manager/jobmanager/jobManager.go @@ -144,7 +144,17 @@ func (m *JobManager) processJobs() { // Always attempt to check if we have anchor jobs, even if none were dequeued. This is because we might have a // configured minimum number of jobs to run. processAnchorJobs := true - // Try to dequeue multiple jobs and collapse similar ones: + // Advance each freshly discovered "queued" job to the "dequeued" stage + queuedJobs := m.db.GetQueuedJobs() + if len(queuedJobs) > 0 { + log.Printf("processJobs: found queued %d jobs...", len(queuedJobs)) + for _, jobState := range queuedJobs { + m.advanceJob(jobState) + } + // Wait for any running job advancement goroutines to finish before processing jobs + m.waitGroup.Wait() + } + // Try to start multiple jobs and collapse similar ones: // - one deploy at a time // - any number of anchor workers (compatible with with smoke/E2E tests) // - one smoke test at a time (compatible with anchor workers, E2E tests) @@ -152,52 +162,37 @@ func (m *JobManager) processJobs() { // // Loop over compatible dequeued jobs until we find an incompatible one and need to wait for existing jobs to // complete. - dequeuedJobs := m.db.DequeueJobs() + dequeuedJobs := m.db.GetDequeuedJobs() if len(dequeuedJobs) > 0 { log.Printf("processJobs: dequeued %d jobs...", len(dequeuedJobs)) - // Preprocess dequeued jobs, and filter successfully prepared ones. - // Ref: https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating - tempSlice := dequeuedJobs[:0] - for _, jobState := range dequeuedJobs { - if _, err := m.prepareJob(jobState); err != nil { - log.Printf("processJobs: job generation failed: %v, %s", err, manager.PrintJob(jobState)) - } else { - tempSlice = append(tempSlice, jobState) - } - } - dequeuedJobs = tempSlice - // Recheck the length of `dequeuedJobs` in case any job(s) failed preprocessing and got filtered out - if len(dequeuedJobs) > 0 { - // Check for any force deploy jobs, and only look at the remaining jobs if no deployments were kicked - // off. - if m.processForceDeployJobs(dequeuedJobs) { - processAnchorJobs = false - } else - // Decide how to proceed based on the first job from the list - if dequeuedJobs[0].Type == manager.JobType_Deploy { - m.processDeployJobs(dequeuedJobs) - // There are two scenarios for anchor jobs on encountering a deploy job at the head of the queue: - // - Anchor jobs are started if no deployment was *started*, even if this deploy job was ahead of - // anchor jobs in the queue. - // - Anchor jobs are not started since a deploy job was *dequeued* ahead of them. (This would be the - // normal behavior for a job queue, i.e. jobs get processed in the order they were scheduled.) - // - // The first scenario only applies to the QA environment that is used for running the E2E tests. E2E - // tests need anchor jobs to run, but if all jobs are processed sequentially, anchor jobs required - // for processing test streams can get blocked by deploy jobs, which are in turn blocked by the E2E - // tests themselves. Letting anchor jobs "skip the queue" prevents this "deadlock". - // - // Testing for this scenario can be simplified by checking whether E2E tests were in progress. So, - // anchor jobs will only be able to "skip the queue" if E2E tests were running but fallback to - // sequential processing otherwise. Since E2E tests only run in QA, all other environments (and QA - // for all other scenarios besides active E2E tests) will have the default (sequential) behavior. - e2eTestJobs := m.cache.JobsByMatcher(func(js manager.JobState) bool { - return manager.IsActiveJob(js) && (js.Type == manager.JobType_TestE2E) - }) - processAnchorJobs = len(e2eTestJobs) > 0 - } else { - m.processTestJobs(dequeuedJobs) - } + // Check for any force deploy jobs, and only look at the remaining jobs if no deployments were kicked off. + if m.processForceDeployJobs(dequeuedJobs) { + processAnchorJobs = false + } else + // Decide how to proceed based on the first job from the list + if dequeuedJobs[0].Type == manager.JobType_Deploy { + m.processDeployJobs(dequeuedJobs) + // There are two scenarios for anchor jobs on encountering a deploy job at the head of the queue: + // - Anchor jobs are started if no deployment was *started*, even if this deploy job was ahead of + // anchor jobs in the queue. + // - Anchor jobs are not started since a deploy job was *dequeued* ahead of them. (This would be the + // normal behavior for a job queue, i.e. jobs get processed in the order they were scheduled.) + // + // The first scenario only applies to the QA environment that is used for running the E2E tests. E2E + // tests need anchor jobs to run, but if all jobs are processed sequentially, anchor jobs required + // for processing test streams can get blocked by deploy jobs, which are in turn blocked by the E2E + // tests themselves. Letting anchor jobs "skip the queue" prevents this "deadlock". + // + // Testing for this scenario can be simplified by checking whether E2E tests were in progress. So, + // anchor jobs will only be able to "skip the queue" if E2E tests were running but fallback to + // sequential processing otherwise. Since E2E tests only run in QA, all other environments (and QA + // for all other scenarios besides active E2E tests) will have the default (sequential) behavior. + e2eTestJobs := m.cache.JobsByMatcher(func(js manager.JobState) bool { + return manager.IsActiveJob(js) && (js.Type == manager.JobType_TestE2E) + }) + processAnchorJobs = len(e2eTestJobs) > 0 + } else { + m.processTestJobs(dequeuedJobs) } } // If no deploy jobs were launched, process pending anchor jobs. We don't want to hold on to anchor jobs queued @@ -564,9 +559,11 @@ func (m *JobManager) prepareJob(jobState manager.JobState) (manager.Job, error) } func (m *JobManager) updateJobStage(jobState manager.JobState, jobStage manager.JobStage) error { + // Update job stage and timestamp jobState.Stage = jobStage + jobState.Ts = time.Now() // Update the job in the database before sending any notification - we should just come back and try again. - if err := m.db.WriteJob(jobState); err != nil { + if err := m.db.AdvanceJob(jobState); err != nil { log.Printf("updateJobStage: failed to update %s job: %v, %s", jobStage, err, manager.PrintJob(jobState)) return err } diff --git a/cd/manager/jobs/anchor.go b/cd/manager/jobs/anchor.go index 23c9e79..8d4d8cc 100644 --- a/cd/manager/jobs/anchor.go +++ b/cd/manager/jobs/anchor.go @@ -28,6 +28,12 @@ func AnchorJob(db manager.Database, d manager.Deployment, notifs manager.Notifs, func (a anchorJob) AdvanceJob() (manager.JobState, error) { if a.state.Stage == manager.JobStage_Queued { + // No preparation needed so advance the job directly to "dequeued" + a.state.Stage = manager.JobStage_Dequeued + // Don't update the timestamp here so that the "dequeued" event remains at the same position on the timeline as + // the "queued" event. + return a.state, a.db.AdvanceJob(a.state) + } else if a.state.Stage == manager.JobStage_Dequeued { var overrides map[string]string = nil // Check if this is a CASv5 anchor job if manager.IsV5WorkerJob(a.state) { @@ -92,8 +98,7 @@ func (a anchorJob) AdvanceJob() (manager.JobState, error) { // There's nothing left to do so we shouldn't have reached here return a.state, fmt.Errorf("anchorJob: unexpected state: %s", manager.PrintJob(a.state)) } - // Advance the timestamp a.state.Ts = time.Now() a.notifs.NotifyJob(a.state) - return a.state, a.db.WriteJob(a.state) + return a.state, a.db.AdvanceJob(a.state) } diff --git a/cd/manager/jobs/deploy.go b/cd/manager/jobs/deploy.go index a9e7eca..104abfd 100644 --- a/cd/manager/jobs/deploy.go +++ b/cd/manager/jobs/deploy.go @@ -29,80 +29,42 @@ func DeployJob(db manager.Database, d manager.Deployment, repo manager.Repositor } else if sha, found := jobState.Params[manager.JobParam_Sha].(string); !found { return nil, fmt.Errorf("deployJob: missing sha") } else { - c := manager.DeployComponent(component) manual, _ := jobState.Params[manager.JobParam_Manual].(bool) rollback, _ := jobState.Params[manager.JobParam_Rollback].(bool) - // If "layout" is absent, this job has been dequeued for the first time and we need to do some preprocessing. - if _, found := jobState.Params[manager.JobParam_Layout]; !found { - if rollback { - // Use the latest successfully deployed commit hash when rolling back - deployHashes, err := db.GetDeployHashes() - if err != nil { - return nil, err - } - sha = deployHashes[c] - } else - // - If the specified commit hash is "latest", fetch the latest branch commit hash from GitHub. - // - Else if it's a valid hash, use it. - // - Else use the latest build hash from the database. - // - // The last two cases will only happen when redeploying manually, so we can note that in the notification. - if sha == manager.BuildHashLatest { - shaTag, _ := jobState.Params[manager.JobParam_ShaTag].(string) - latestSha, err := repo.GetLatestCommitHash( - manager.ComponentRepo(c), - manager.EnvBranch(c, manager.EnvType(os.Getenv("ENV"))), - shaTag, - ) - if err != nil { - return nil, err - } - sha = latestSha - } else { - if !manager.IsValidSha(sha) { - // Get the latest build commit hash from the database when making a fresh deployment - buildHashes, err := db.GetBuildHashes() - if err != nil { - return nil, err - } - sha = buildHashes[c] - } - manual = true - } - jobState.Params[manager.JobParam_Sha] = sha - if manual { - jobState.Params[manager.JobParam_Manual] = true - } - if envLayout, err := d.GenerateEnvLayout(c); err != nil { - return nil, err - } else { - jobState.Params[manager.JobParam_Layout] = *envLayout - } - // Advance the timestamp - jobState.Ts = time.Now() - if err := db.WriteJob(jobState); err != nil { - return nil, err - } - // Send notification for job dequeued for the first time - notifs.NotifyJob(jobState) - } - return &deployJob{jobState, db, d, repo, notifs, c, sha, manual, rollback}, nil + return &deployJob{jobState, db, d, repo, notifs, manager.DeployComponent(component), sha, manual, rollback}, nil } } -func (d deployJob) AdvanceJob() (manager.JobState, error) { +func (d *deployJob) AdvanceJob() (manager.JobState, error) { if d.state.Stage == manager.JobStage_Queued { if deployHashes, err := d.db.GetDeployHashes(); err != nil { d.state.Stage = manager.JobStage_Failed d.state.Params[manager.JobParam_Error] = err.Error() log.Printf("deployJob: error fetching deploy hashes: %v, %s", err, manager.PrintJob(d.state)) + } else if err := d.prepareJob(deployHashes); err != nil { + d.state.Stage = manager.JobStage_Failed + d.state.Params[manager.JobParam_Error] = err.Error() + log.Printf("deployJob: error preparing job: %v, %s", err, manager.PrintJob(d.state)) } else if !d.manual && !d.rollback && (d.sha == deployHashes[d.component]) { // Skip automated jobs if the commit hash being deployed is the same as the commit hash already deployed. We // don't do this for manual jobs because deploying an already deployed hash might be intentional, or for // rollbacks because we WANT to redeploy the last successfully deployed hash. d.state.Stage = manager.JobStage_Skipped log.Printf("deployJob: commit hash same as deployed hash: %s", manager.PrintJob(d.state)) - } else if err := d.updateEnv(d.sha); err != nil { + } else if envLayout, err := d.d.GenerateEnvLayout(d.component); err != nil { + d.state.Stage = manager.JobStage_Failed + d.state.Params[manager.JobParam_Error] = err.Error() + log.Printf("deployJob: error preparing job: %v, %s", err, manager.PrintJob(d.state)) + } else { + d.state.Stage = manager.JobStage_Dequeued + d.state.Params[manager.JobParam_Layout] = *envLayout + } + // Don't update the timestamp here so that the "dequeued" event remains at the same position on the timeline as + // the "queued" event. + d.notifs.NotifyJob(d.state) + return d.state, d.db.AdvanceJob(d.state) + } else if d.state.Stage == manager.JobStage_Dequeued { + if err := d.updateEnv(d.sha); err != nil { d.state.Stage = manager.JobStage_Failed d.state.Params[manager.JobParam_Error] = err.Error() log.Printf("deployJob: error updating service: %v, %s", err, manager.PrintJob(d.state)) @@ -140,20 +102,58 @@ func (d deployJob) AdvanceJob() (manager.JobState, error) { // There's nothing left to do so we shouldn't have reached here return d.state, fmt.Errorf("deployJob: unexpected state: %s", manager.PrintJob(d.state)) } - // Advance the timestamp d.state.Ts = time.Now() d.notifs.NotifyJob(d.state) - return d.state, d.db.WriteJob(d.state) + return d.state, d.db.AdvanceJob(d.state) +} + +func (d *deployJob) prepareJob(deployHashes map[manager.DeployComponent]string) error { + if d.rollback { + // Use the latest successfully deployed commit hash when rolling back + d.sha = deployHashes[d.component] + } else + // - If the specified commit hash is "latest", fetch the latest branch commit hash from GitHub. + // - Else if it's a valid hash, use it. + // - Else use the latest build hash from the database. + // + // The last two cases will only happen when redeploying manually, so we can note that in the notification. + if d.sha == manager.BuildHashLatest { + shaTag, _ := d.state.Params[manager.JobParam_ShaTag].(string) + if latestSha, err := d.repo.GetLatestCommitHash( + manager.ComponentRepo(d.component), + manager.EnvBranch(d.component, manager.EnvType(os.Getenv("ENV"))), + shaTag, + ); err != nil { + return err + } else { + d.sha = latestSha + } + } else { + if !manager.IsValidSha(d.sha) { + // Get the latest build commit hash from the database when making a fresh deployment + if buildHashes, err := d.db.GetBuildHashes(); err != nil { + return err + } else { + d.sha = buildHashes[d.component] + } + } + d.manual = true + } + d.state.Params[manager.JobParam_Sha] = d.sha + if d.manual { + d.state.Params[manager.JobParam_Manual] = true + } + return nil } -func (d deployJob) updateEnv(commitHash string) error { +func (d *deployJob) updateEnv(commitHash string) error { if layout, found := d.state.Params[manager.JobParam_Layout].(manager.Layout); found { return d.d.UpdateEnv(&layout, commitHash) } return fmt.Errorf("updateEnv: missing env layout") } -func (d deployJob) checkEnv() (bool, error) { +func (d *deployJob) checkEnv() (bool, error) { if layout, found := d.state.Params[manager.JobParam_Layout].(manager.Layout); !found { return false, fmt.Errorf("checkEnv: missing env layout") } else if deployed, err := d.d.CheckEnv(&layout); err != nil { diff --git a/cd/manager/jobs/e2e.go b/cd/manager/jobs/e2e.go index 180c042..bc417f2 100644 --- a/cd/manager/jobs/e2e.go +++ b/cd/manager/jobs/e2e.go @@ -27,6 +27,12 @@ func E2eTestJob(db manager.Database, d manager.Deployment, notifs manager.Notifs func (e e2eTestJob) AdvanceJob() (manager.JobState, error) { if e.state.Stage == manager.JobStage_Queued { + // No preparation needed so advance the job directly to "dequeued" + e.state.Stage = manager.JobStage_Dequeued + // Don't update the timestamp here so that the "dequeued" event remains at the same position on the timeline as + // the "queued" event. + return e.state, e.db.AdvanceJob(e.state) + } else if e.state.Stage == manager.JobStage_Dequeued { if err := e.startE2eTests(); err != nil { e.state.Stage = manager.JobStage_Failed e.state.Params[manager.JobParam_Error] = err.Error() @@ -71,10 +77,9 @@ func (e e2eTestJob) AdvanceJob() (manager.JobState, error) { // There's nothing left to do so we shouldn't have reached here return e.state, fmt.Errorf("e2eJob: unexpected state: %s", manager.PrintJob(e.state)) } - // Advance the timestamp e.state.Ts = time.Now() e.notifs.NotifyJob(e.state) - return e.state, e.db.WriteJob(e.state) + return e.state, e.db.AdvanceJob(e.state) } func (e e2eTestJob) startE2eTests() error { diff --git a/cd/manager/jobs/smoke.go b/cd/manager/jobs/smoke.go index f7c05a7..ef41074 100644 --- a/cd/manager/jobs/smoke.go +++ b/cd/manager/jobs/smoke.go @@ -33,6 +33,12 @@ func SmokeTestJob(db manager.Database, d manager.Deployment, notifs manager.Noti func (s smokeTestJob) AdvanceJob() (manager.JobState, error) { if s.state.Stage == manager.JobStage_Queued { + // No preparation needed so advance the job directly to "dequeued" + s.state.Stage = manager.JobStage_Dequeued + // Don't update the timestamp here so that the "dequeued" event remains at the same position on the timeline as + // the "queued" event. + return s.state, s.db.AdvanceJob(s.state) + } else if s.state.Stage == manager.JobStage_Dequeued { // Launch smoke test if id, err := s.d.LaunchTask( ClusterName, @@ -83,8 +89,7 @@ func (s smokeTestJob) AdvanceJob() (manager.JobState, error) { // There's nothing left to do so we shouldn't have reached here return s.state, fmt.Errorf("smokeTestJob: unexpected state: %s", manager.PrintJob(s.state)) } - // Advance the timestamp s.state.Ts = time.Now() s.notifs.NotifyJob(s.state) - return s.state, s.db.WriteJob(s.state) + return s.state, s.db.AdvanceJob(s.state) } diff --git a/cd/manager/models.go b/cd/manager/models.go index 979943d..b8c1093 100644 --- a/cd/manager/models.go +++ b/cd/manager/models.go @@ -28,6 +28,7 @@ type JobStage string const ( JobStage_Queued JobStage = "queued" + JobStage_Dequeued JobStage = "dequeued" JobStage_Skipped JobStage = "skipped" JobStage_Started JobStage = "started" JobStage_Waiting JobStage = "waiting" @@ -223,7 +224,9 @@ type ApiGw interface { type Database interface { InitializeJobs() error QueueJob(JobState) error - DequeueJobs() []JobState + GetQueuedJobs() []JobState + GetDequeuedJobs() []JobState + AdvanceJob(JobState) error WriteJob(JobState) error IterateByType(JobType, bool, func(JobState) bool) error UpdateBuildHash(DeployComponent, string) error diff --git a/cd/manager/notifs/discord.go b/cd/manager/notifs/discord.go index 1597869..e12cb4c 100644 --- a/cd/manager/notifs/discord.go +++ b/cd/manager/notifs/discord.go @@ -224,7 +224,7 @@ func (n JobNotifs) getNotifFields(jobState manager.JobState) []discord.EmbedFiel func (n JobNotifs) getNotifColor(jobState manager.JobState) DiscordColor { switch jobState.Stage { - case manager.JobStage_Queued: + case manager.JobStage_Dequeued: return DiscordColor_Info case manager.JobStage_Skipped: return DiscordColor_Warning