Skip to content

Commit

Permalink
fix(cd): add new stage for dequeued jobs that need preparation
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Oct 24, 2023
1 parent b58d8cf commit af3a326
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 122 deletions.
29 changes: 23 additions & 6 deletions cd/manager/common/aws/dynamoDb.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ func (db DynamoDb) InitializeJobs() error {
return err
} else if err = db.loadJobs(manager.JobStage_Started, ttlCursor); err != nil {
return err
} else if err = db.loadJobs(manager.JobStage_Dequeued, ttlCursor); err != nil {
return err
} else {
return db.loadJobs(manager.JobStage_Skipped, ttlCursor)
}
Expand All @@ -254,12 +256,12 @@ func (db DynamoDb) QueueJob(jobState manager.JobState) error {
// just a hash-map from job IDs to job state for ACTIVE jobs (jobs are not added to the cache until they are in
// progress). This also means that we don't need to write jobs to the database if they're already in the cache.
if _, found := db.cache.JobById(jobState.Job); !found {
return db.writeJob(jobState)
return db.WriteJob(jobState)
}
return nil
}

func (db DynamoDb) DequeueJobs() []manager.JobState {
func (db DynamoDb) GetQueuedJobs() []manager.JobState {
// If available, use the timestamp of the previously found first job not already in processing as the start of the
// current database search. We can't know for sure that all subsequent jobs are unprocessed (e.g. force deploys or
// anchors could mess up that assumption), but what we can say for sure is that all prior jobs have at least entered
Expand Down Expand Up @@ -287,7 +289,7 @@ func (db DynamoDb) DequeueJobs() []manager.JobState {
// Return true so that we keep on iterating.
return true
}); err != nil {
log.Printf("dequeueJobs: failed iteration through jobs: %v", err)
log.Printf("queuedJobs: failed iteration through jobs: %v", err)
}
// If the cursor is still unset, then we found no jobs that weren't already in processing or done. In that case, set
// the cursor to "now" so we know to search from this point in time onwards. There's no point looking up jobs from
Expand All @@ -298,6 +300,21 @@ func (db DynamoDb) DequeueJobs() []manager.JobState {
return jobs
}

func (db DynamoDb) GetDequeuedJobs() []manager.JobState {
jobs := make([]manager.JobState, 0, 0)
if err := db.iterateByStage(manager.JobStage_Dequeued, db.cursor, true, func(jobState manager.JobState) bool {
// Append the job if it's in the cache but hasn't been started yet
if cachedJob, found := db.cache.JobById(jobState.Job); found && cachedJob.Stage == manager.JobStage_Dequeued {
jobs = append(jobs, jobState)
}
// Return true so that we keep on iterating.
return true
}); err != nil {
log.Printf("dequeuedJobs: failed iteration through jobs: %v", err)
}
return jobs
}

func (db DynamoDb) IterateByType(jobType manager.JobType, asc bool, iter func(manager.JobState) bool) error {
return db.iterateByType(jobType, time.Now().AddDate(0, 0, -manager.DefaultTtlDays), asc, iter)
}
Expand Down Expand Up @@ -395,15 +412,15 @@ func (db DynamoDb) iterateEvents(queryInput *dynamodb.QueryInput, iter func(mana
return nil
}

func (db DynamoDb) WriteJob(jobState manager.JobState) error {
if err := db.writeJob(jobState); err != nil {
func (db DynamoDb) AdvanceJob(jobState manager.JobState) error {
if err := db.WriteJob(jobState); err != nil {
return err
}
db.cache.WriteJob(jobState)
return nil
}

func (db DynamoDb) writeJob(jobState manager.JobState) error {
func (db DynamoDb) WriteJob(jobState manager.JobState) error {
// Generate a new UUID for every job update
jobState.Id = uuid.New().String()
// Set entry expiration
Expand Down
89 changes: 43 additions & 46 deletions cd/manager/jobmanager/jobManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,60 +144,55 @@ func (m *JobManager) processJobs() {
// Always attempt to check if we have anchor jobs, even if none were dequeued. This is because we might have a
// configured minimum number of jobs to run.
processAnchorJobs := true
// Try to dequeue multiple jobs and collapse similar ones:
// Advance each freshly discovered "queued" job to the "dequeued" stage
queuedJobs := m.db.GetQueuedJobs()
if len(queuedJobs) > 0 {
log.Printf("processJobs: found queued %d jobs...", len(queuedJobs))
for _, jobState := range queuedJobs {
m.advanceJob(jobState)
}
// Wait for any running job advancement goroutines to finish before processing jobs
m.waitGroup.Wait()
}
// Try to start multiple jobs and collapse similar ones:
// - one deploy at a time
// - any number of anchor workers (compatible with with smoke/E2E tests)
// - one smoke test at a time (compatible with anchor workers, E2E tests)
// - one E2E test at a time (compatible with anchor workers, smoke tests)
//
// Loop over compatible dequeued jobs until we find an incompatible one and need to wait for existing jobs to
// complete.
dequeuedJobs := m.db.DequeueJobs()
dequeuedJobs := m.db.GetDequeuedJobs()
if len(dequeuedJobs) > 0 {
log.Printf("processJobs: dequeued %d jobs...", len(dequeuedJobs))
// Preprocess dequeued jobs, and filter successfully prepared ones.
// Ref: https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
tempSlice := dequeuedJobs[:0]
for _, jobState := range dequeuedJobs {
if _, err := m.prepareJob(jobState); err != nil {
log.Printf("processJobs: job generation failed: %v, %s", err, manager.PrintJob(jobState))
} else {
tempSlice = append(tempSlice, jobState)
}
}
dequeuedJobs = tempSlice
// Recheck the length of `dequeuedJobs` in case any job(s) failed preprocessing and got filtered out
if len(dequeuedJobs) > 0 {
// Check for any force deploy jobs, and only look at the remaining jobs if no deployments were kicked
// off.
if m.processForceDeployJobs(dequeuedJobs) {
processAnchorJobs = false
} else
// Decide how to proceed based on the first job from the list
if dequeuedJobs[0].Type == manager.JobType_Deploy {
m.processDeployJobs(dequeuedJobs)
// There are two scenarios for anchor jobs on encountering a deploy job at the head of the queue:
// - Anchor jobs are started if no deployment was *started*, even if this deploy job was ahead of
// anchor jobs in the queue.
// - Anchor jobs are not started since a deploy job was *dequeued* ahead of them. (This would be the
// normal behavior for a job queue, i.e. jobs get processed in the order they were scheduled.)
//
// The first scenario only applies to the QA environment that is used for running the E2E tests. E2E
// tests need anchor jobs to run, but if all jobs are processed sequentially, anchor jobs required
// for processing test streams can get blocked by deploy jobs, which are in turn blocked by the E2E
// tests themselves. Letting anchor jobs "skip the queue" prevents this "deadlock".
//
// Testing for this scenario can be simplified by checking whether E2E tests were in progress. So,
// anchor jobs will only be able to "skip the queue" if E2E tests were running but fallback to
// sequential processing otherwise. Since E2E tests only run in QA, all other environments (and QA
// for all other scenarios besides active E2E tests) will have the default (sequential) behavior.
e2eTestJobs := m.cache.JobsByMatcher(func(js manager.JobState) bool {
return manager.IsActiveJob(js) && (js.Type == manager.JobType_TestE2E)
})
processAnchorJobs = len(e2eTestJobs) > 0
} else {
m.processTestJobs(dequeuedJobs)
}
// Check for any force deploy jobs, and only look at the remaining jobs if no deployments were kicked off.
if m.processForceDeployJobs(dequeuedJobs) {
processAnchorJobs = false
} else
// Decide how to proceed based on the first job from the list
if dequeuedJobs[0].Type == manager.JobType_Deploy {
m.processDeployJobs(dequeuedJobs)
// There are two scenarios for anchor jobs on encountering a deploy job at the head of the queue:
// - Anchor jobs are started if no deployment was *started*, even if this deploy job was ahead of
// anchor jobs in the queue.
// - Anchor jobs are not started since a deploy job was *dequeued* ahead of them. (This would be the
// normal behavior for a job queue, i.e. jobs get processed in the order they were scheduled.)
//
// The first scenario only applies to the QA environment that is used for running the E2E tests. E2E
// tests need anchor jobs to run, but if all jobs are processed sequentially, anchor jobs required
// for processing test streams can get blocked by deploy jobs, which are in turn blocked by the E2E
// tests themselves. Letting anchor jobs "skip the queue" prevents this "deadlock".
//
// Testing for this scenario can be simplified by checking whether E2E tests were in progress. So,
// anchor jobs will only be able to "skip the queue" if E2E tests were running but fallback to
// sequential processing otherwise. Since E2E tests only run in QA, all other environments (and QA
// for all other scenarios besides active E2E tests) will have the default (sequential) behavior.
e2eTestJobs := m.cache.JobsByMatcher(func(js manager.JobState) bool {
return manager.IsActiveJob(js) && (js.Type == manager.JobType_TestE2E)
})
processAnchorJobs = len(e2eTestJobs) > 0
} else {
m.processTestJobs(dequeuedJobs)
}
}
// If no deploy jobs were launched, process pending anchor jobs. We don't want to hold on to anchor jobs queued
Expand Down Expand Up @@ -564,9 +559,11 @@ func (m *JobManager) prepareJob(jobState manager.JobState) (manager.Job, error)
}

func (m *JobManager) updateJobStage(jobState manager.JobState, jobStage manager.JobStage) error {
// Update job stage and timestamp
jobState.Stage = jobStage
jobState.Ts = time.Now()
// Update the job in the database before sending any notification - we should just come back and try again.
if err := m.db.WriteJob(jobState); err != nil {
if err := m.db.AdvanceJob(jobState); err != nil {
log.Printf("updateJobStage: failed to update %s job: %v, %s", jobStage, err, manager.PrintJob(jobState))
return err
}
Expand Down
9 changes: 7 additions & 2 deletions cd/manager/jobs/anchor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ func AnchorJob(db manager.Database, d manager.Deployment, notifs manager.Notifs,

func (a anchorJob) AdvanceJob() (manager.JobState, error) {
if a.state.Stage == manager.JobStage_Queued {
// No preparation needed so advance the job directly to "dequeued"
a.state.Stage = manager.JobStage_Dequeued
// Don't update the timestamp here so that the "dequeued" event remains at the same position on the timeline as
// the "queued" event.
return a.state, a.db.AdvanceJob(a.state)
} else if a.state.Stage == manager.JobStage_Dequeued {
var overrides map[string]string = nil
// Check if this is a CASv5 anchor job
if manager.IsV5WorkerJob(a.state) {
Expand Down Expand Up @@ -92,8 +98,7 @@ func (a anchorJob) AdvanceJob() (manager.JobState, error) {
// There's nothing left to do so we shouldn't have reached here
return a.state, fmt.Errorf("anchorJob: unexpected state: %s", manager.PrintJob(a.state))
}
// Advance the timestamp
a.state.Ts = time.Now()
a.notifs.NotifyJob(a.state)
return a.state, a.db.WriteJob(a.state)
return a.state, a.db.AdvanceJob(a.state)
}
124 changes: 62 additions & 62 deletions cd/manager/jobs/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,80 +29,42 @@ func DeployJob(db manager.Database, d manager.Deployment, repo manager.Repositor
} else if sha, found := jobState.Params[manager.JobParam_Sha].(string); !found {
return nil, fmt.Errorf("deployJob: missing sha")
} else {
c := manager.DeployComponent(component)
manual, _ := jobState.Params[manager.JobParam_Manual].(bool)
rollback, _ := jobState.Params[manager.JobParam_Rollback].(bool)
// If "layout" is absent, this job has been dequeued for the first time and we need to do some preprocessing.
if _, found := jobState.Params[manager.JobParam_Layout]; !found {
if rollback {
// Use the latest successfully deployed commit hash when rolling back
deployHashes, err := db.GetDeployHashes()
if err != nil {
return nil, err
}
sha = deployHashes[c]
} else
// - If the specified commit hash is "latest", fetch the latest branch commit hash from GitHub.
// - Else if it's a valid hash, use it.
// - Else use the latest build hash from the database.
//
// The last two cases will only happen when redeploying manually, so we can note that in the notification.
if sha == manager.BuildHashLatest {
shaTag, _ := jobState.Params[manager.JobParam_ShaTag].(string)
latestSha, err := repo.GetLatestCommitHash(
manager.ComponentRepo(c),
manager.EnvBranch(c, manager.EnvType(os.Getenv("ENV"))),
shaTag,
)
if err != nil {
return nil, err
}
sha = latestSha
} else {
if !manager.IsValidSha(sha) {
// Get the latest build commit hash from the database when making a fresh deployment
buildHashes, err := db.GetBuildHashes()
if err != nil {
return nil, err
}
sha = buildHashes[c]
}
manual = true
}
jobState.Params[manager.JobParam_Sha] = sha
if manual {
jobState.Params[manager.JobParam_Manual] = true
}
if envLayout, err := d.GenerateEnvLayout(c); err != nil {
return nil, err
} else {
jobState.Params[manager.JobParam_Layout] = *envLayout
}
// Advance the timestamp
jobState.Ts = time.Now()
if err := db.WriteJob(jobState); err != nil {
return nil, err
}
// Send notification for job dequeued for the first time
notifs.NotifyJob(jobState)
}
return &deployJob{jobState, db, d, repo, notifs, c, sha, manual, rollback}, nil
return &deployJob{jobState, db, d, repo, notifs, manager.DeployComponent(component), sha, manual, rollback}, nil
}
}

func (d deployJob) AdvanceJob() (manager.JobState, error) {
func (d *deployJob) AdvanceJob() (manager.JobState, error) {
if d.state.Stage == manager.JobStage_Queued {
if deployHashes, err := d.db.GetDeployHashes(); err != nil {
d.state.Stage = manager.JobStage_Failed
d.state.Params[manager.JobParam_Error] = err.Error()
log.Printf("deployJob: error fetching deploy hashes: %v, %s", err, manager.PrintJob(d.state))
} else if err := d.prepareJob(deployHashes); err != nil {
d.state.Stage = manager.JobStage_Failed
d.state.Params[manager.JobParam_Error] = err.Error()
log.Printf("deployJob: error preparing job: %v, %s", err, manager.PrintJob(d.state))
} else if !d.manual && !d.rollback && (d.sha == deployHashes[d.component]) {
// Skip automated jobs if the commit hash being deployed is the same as the commit hash already deployed. We
// don't do this for manual jobs because deploying an already deployed hash might be intentional, or for
// rollbacks because we WANT to redeploy the last successfully deployed hash.
d.state.Stage = manager.JobStage_Skipped
log.Printf("deployJob: commit hash same as deployed hash: %s", manager.PrintJob(d.state))
} else if err := d.updateEnv(d.sha); err != nil {
} else if envLayout, err := d.d.GenerateEnvLayout(d.component); err != nil {
d.state.Stage = manager.JobStage_Failed
d.state.Params[manager.JobParam_Error] = err.Error()
log.Printf("deployJob: error preparing job: %v, %s", err, manager.PrintJob(d.state))
} else {
d.state.Stage = manager.JobStage_Dequeued
d.state.Params[manager.JobParam_Layout] = *envLayout
}
// Don't update the timestamp here so that the "dequeued" event remains at the same position on the timeline as
// the "queued" event.
d.notifs.NotifyJob(d.state)
return d.state, d.db.AdvanceJob(d.state)
} else if d.state.Stage == manager.JobStage_Dequeued {
if err := d.updateEnv(d.sha); err != nil {
d.state.Stage = manager.JobStage_Failed
d.state.Params[manager.JobParam_Error] = err.Error()
log.Printf("deployJob: error updating service: %v, %s", err, manager.PrintJob(d.state))
Expand Down Expand Up @@ -140,20 +102,58 @@ func (d deployJob) AdvanceJob() (manager.JobState, error) {
// There's nothing left to do so we shouldn't have reached here
return d.state, fmt.Errorf("deployJob: unexpected state: %s", manager.PrintJob(d.state))
}
// Advance the timestamp
d.state.Ts = time.Now()
d.notifs.NotifyJob(d.state)
return d.state, d.db.WriteJob(d.state)
return d.state, d.db.AdvanceJob(d.state)
}

func (d *deployJob) prepareJob(deployHashes map[manager.DeployComponent]string) error {
if d.rollback {
// Use the latest successfully deployed commit hash when rolling back
d.sha = deployHashes[d.component]
} else
// - If the specified commit hash is "latest", fetch the latest branch commit hash from GitHub.
// - Else if it's a valid hash, use it.
// - Else use the latest build hash from the database.
//
// The last two cases will only happen when redeploying manually, so we can note that in the notification.
if d.sha == manager.BuildHashLatest {
shaTag, _ := d.state.Params[manager.JobParam_ShaTag].(string)
if latestSha, err := d.repo.GetLatestCommitHash(
manager.ComponentRepo(d.component),
manager.EnvBranch(d.component, manager.EnvType(os.Getenv("ENV"))),
shaTag,
); err != nil {
return err
} else {
d.sha = latestSha
}
} else {
if !manager.IsValidSha(d.sha) {
// Get the latest build commit hash from the database when making a fresh deployment
if buildHashes, err := d.db.GetBuildHashes(); err != nil {
return err
} else {
d.sha = buildHashes[d.component]
}
}
d.manual = true
}
d.state.Params[manager.JobParam_Sha] = d.sha
if d.manual {
d.state.Params[manager.JobParam_Manual] = true
}
return nil
}

func (d deployJob) updateEnv(commitHash string) error {
func (d *deployJob) updateEnv(commitHash string) error {
if layout, found := d.state.Params[manager.JobParam_Layout].(manager.Layout); found {
return d.d.UpdateEnv(&layout, commitHash)
}
return fmt.Errorf("updateEnv: missing env layout")
}

func (d deployJob) checkEnv() (bool, error) {
func (d *deployJob) checkEnv() (bool, error) {
if layout, found := d.state.Params[manager.JobParam_Layout].(manager.Layout); !found {
return false, fmt.Errorf("checkEnv: missing env layout")
} else if deployed, err := d.d.CheckEnv(&layout); err != nil {
Expand Down
9 changes: 7 additions & 2 deletions cd/manager/jobs/e2e.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ func E2eTestJob(db manager.Database, d manager.Deployment, notifs manager.Notifs

func (e e2eTestJob) AdvanceJob() (manager.JobState, error) {
if e.state.Stage == manager.JobStage_Queued {
// No preparation needed so advance the job directly to "dequeued"
e.state.Stage = manager.JobStage_Dequeued
// Don't update the timestamp here so that the "dequeued" event remains at the same position on the timeline as
// the "queued" event.
return e.state, e.db.AdvanceJob(e.state)
} else if e.state.Stage == manager.JobStage_Dequeued {
if err := e.startE2eTests(); err != nil {
e.state.Stage = manager.JobStage_Failed
e.state.Params[manager.JobParam_Error] = err.Error()
Expand Down Expand Up @@ -71,10 +77,9 @@ func (e e2eTestJob) AdvanceJob() (manager.JobState, error) {
// There's nothing left to do so we shouldn't have reached here
return e.state, fmt.Errorf("e2eJob: unexpected state: %s", manager.PrintJob(e.state))
}
// Advance the timestamp
e.state.Ts = time.Now()
e.notifs.NotifyJob(e.state)
return e.state, e.db.WriteJob(e.state)
return e.state, e.db.AdvanceJob(e.state)
}

func (e e2eTestJob) startE2eTests() error {
Expand Down
Loading

0 comments on commit af3a326

Please sign in to comment.