Skip to content

Commit

Permalink
feat(cd): additional refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Oct 31, 2023
1 parent 1fc0bf9 commit d7bc47e
Show file tree
Hide file tree
Showing 19 changed files with 410 additions and 296 deletions.
4 changes: 2 additions & 2 deletions cd/manager/common/aws/ddb/dynamoDb.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,12 @@ func (db DynamoDb) iterateEvents(queryInput *dynamodb.QueryInput, iter func(job.
for _, jobState := range jobsPage {
if jobState.Type == job.JobType_Deploy {
// Marshal layout back into `Layout` structure
if layout, found := jobState.Params[job.JobParam_Layout].(map[string]interface{}); found {
if layout, found := jobState.Params[job.DeployJobParam_Layout].(map[string]interface{}); found {
var marshaledLayout manager.Layout
if err = mapstructure.Decode(layout, &marshaledLayout); err != nil {
return err
}
jobState.Params[job.JobParam_Layout] = marshaledLayout
jobState.Params[job.DeployJobParam_Layout] = marshaledLayout
}
}
if !iter(jobState) {
Expand Down
15 changes: 0 additions & 15 deletions cd/manager/common/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,6 @@ import (
"github.com/3box/pipeline-tools/cd/manager/common/aws/utils"
)

func JobName(jt JobType) string {
switch jt {
case JobType_Deploy:
return JobName_Deploy
case JobType_Anchor:
return JobName_Anchor
case JobType_TestE2E:
return JobName_TestE2E
case JobType_TestSmoke:
return JobName_TestSmoke
default:
return ""
}
}

func IsFinishedJob(jobState JobState) bool {
return (jobState.Stage == JobStage_Skipped) || (jobState.Stage == JobStage_Canceled) || (jobState.Stage == JobStage_Failed) || (jobState.Stage == JobStage_Completed)
}
Expand Down
43 changes: 21 additions & 22 deletions cd/manager/common/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ const (
JobType_TestSmoke JobType = "test_smoke"
)

const (
JobName_Deploy = "Deployment"
JobName_Anchor = "Anchor Worker"
JobName_TestE2E = "E2E Tests"
JobName_TestSmoke = "Smoke Tests"
)

type JobStage string

const (
Expand All @@ -36,21 +29,27 @@ const (
)

const (
JobParam_Component string = "component"
JobParam_Id string = "id"
JobParam_Sha string = "sha"
JobParam_ShaTag string = "shaTag"
JobParam_Error string = "error"
JobParam_Layout string = "layout"
JobParam_Manual string = "manual"
JobParam_Force string = "force"
JobParam_Start string = "start"
JobParam_Rollback string = "rollback"
JobParam_Delayed string = "delayed"
JobParam_Stalled string = "stalled"
JobParam_Source string = "source"
JobParam_Version string = "version"
JobParam_Overrides string = "overrides"
JobParam_Id string = "id"
JobParam_Error string = "error"
JobParam_Start string = "start"
JobParam_Source string = "source"
)

const (
DeployJobParam_Component string = "component"
DeployJobParam_Sha string = "sha"
DeployJobParam_ShaTag string = "shaTag"
DeployJobParam_Layout string = "layout"
DeployJobParam_Manual string = "manual"
DeployJobParam_Force string = "force"
DeployJobParam_Rollback string = "rollback"
)

const (
AnchorJobParam_Delayed string = "delayed"
AnchorJobParam_Stalled string = "stalled"
AnchorJobParam_Version string = "version"
AnchorJobParam_Overrides string = "overrides"
)

// JobState represents the state of a job in the database
Expand Down
3 changes: 2 additions & 1 deletion cd/manager/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssm v1.27.12
github.com/disgoorg/disgo v0.13.16
github.com/disgoorg/snowflake/v2 v2.0.0
github.com/google/go-github v17.0.0+incompatible
github.com/google/go-github/v56 v56.0.0
github.com/google/uuid v1.3.0
github.com/joho/godotenv v1.4.0
github.com/mitchellh/mapstructure v1.5.0
golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8
golang.org/x/oauth2 v0.1.0
golang.org/x/text v0.6.0
)
Expand Down
8 changes: 5 additions & 3 deletions cd/manager/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4rEjNlfyDHW9dolSY=
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-github/v56 v56.0.0 h1:TysL7dMa/r7wsQi44BjqlwaHvwlFlqkK8CtBWCX3gb4=
github.com/google/go-github/v56 v56.0.0/go.mod h1:D8cdcX98YWJvi7TLo7zM4/h8ZTx6u6fwGEkCdisopo0=
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
Expand All @@ -90,6 +90,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8 h1:Xt4/LzbTwfocTk9ZLEu4onjeFucl88iW+v4j4PWbQuE=
golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
Expand Down
88 changes: 37 additions & 51 deletions cd/manager/jobmanager/jobManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"time"

"golang.org/x/exp/maps"

"github.com/google/uuid"

"github.com/3box/pipeline-tools/cd/manager"
Expand Down Expand Up @@ -52,7 +54,7 @@ func NewJobManager(cache manager.Cache, db manager.Database, d manager.Deploymen
return &JobManager{cache, db, d, apiGw, repo, notifs, maxAnchorJobs, minAnchorJobs, paused, manager.EnvType(os.Getenv("ENV")), new(sync.WaitGroup)}, nil
}

func (m *JobManager) NewJob(jobState job.JobState) (string, error) {
func (m *JobManager) NewJob(jobState job.JobState) (job.JobState, error) {
jobState.Stage = job.JobStage_Queued
// Only set the job ID/time if not already set by the caller
if len(jobState.Job) == 0 {
Expand All @@ -64,14 +66,14 @@ func (m *JobManager) NewJob(jobState job.JobState) (string, error) {
if jobState.Params == nil {
jobState.Params = make(map[string]interface{}, 0)
}
return jobState.Job, m.db.QueueJob(jobState)
return jobState, m.db.QueueJob(jobState)
}

func (m *JobManager) CheckJob(jobId string) string {
func (m *JobManager) CheckJob(jobId string) job.JobState {
if cachedJob, found := m.cache.JobById(jobId); found {
return string(cachedJob.Stage)
return cachedJob
}
return ""
return job.JobState{}
}

func (m *JobManager) ProcessJobs(shutdownCh chan bool) {
Expand Down Expand Up @@ -127,27 +129,11 @@ func (m *JobManager) processJobs() {
}
}
// Find all jobs in progress and advance their state before looking for new jobs
activeJobs := m.cache.JobsByMatcher(job.IsActiveJob)
if len(activeJobs) > 0 {
log.Printf("processJobs: checking %d jobs in progress: %s", len(activeJobs), manager.PrintJob(activeJobs...))
for _, activeJob := range activeJobs {
m.advanceJob(activeJob)
}
}
// Wait for any running job advancement goroutines to finish before kicking off more jobs
m.waitGroup.Wait()
m.advanceJobs(m.cache.JobsByMatcher(job.IsActiveJob))
// Don't start any new jobs if the job manager is paused. Existing jobs will continue to be advanced.
if !m.paused {
// Advance each freshly discovered "queued" job to the "dequeued" stage
queuedJobs := m.db.QueuedJobs()
if len(queuedJobs) > 0 {
log.Printf("processJobs: found queued %d jobs...", len(queuedJobs))
for _, jobState := range queuedJobs {
m.advanceJob(jobState)
}
// Wait for any running job advancement goroutines to finish before processing jobs
m.waitGroup.Wait()
}
m.advanceJobs(m.db.QueuedJobs())
// Always attempt to check if we have anchor jobs, even if none were dequeued. This is because we might have a
// configured minimum number of jobs to run.
processAnchorJobs := true
Expand All @@ -156,9 +142,9 @@ func (m *JobManager) processJobs() {
if len(dequeuedJobs) > 0 {
// Try to start multiple jobs and collapse similar ones:
// - one deploy at a time
// - any number of anchor workers (compatible with with smoke/E2E tests)
// - one smoke test at a time (compatible with anchor workers, E2E tests)
// - one E2E test at a time (compatible with anchor workers, smoke tests)
// - any number of anchor workers (compatible with non-deploy jobs)
// - one smoke test at a time (compatible with non-deploy jobs)
// - one E2E test at a time (compatible with non-deploy jobs)
//
// Loop over compatible dequeued jobs until we find an incompatible one and need to wait for existing jobs to
// complete.
Expand Down Expand Up @@ -206,6 +192,16 @@ func (m *JobManager) processJobs() {
m.waitGroup.Wait()
}

func (m *JobManager) advanceJobs(jobs []job.JobState) {
if len(jobs) > 0 {
for _, jobState := range jobs {
m.advanceJob(jobState)
}
// Wait for any running job advancement goroutines to finish
m.waitGroup.Wait()
}
}

func (m *JobManager) checkJobInterval(jobType job.JobType, jobStage job.JobStage, intervalEnv string, processFn func(time.Time) error) error {
if interval, found := os.LookupEnv(intervalEnv); found {
if parsedInterval, err := time.ParseDuration(interval); err != nil {
Expand Down Expand Up @@ -241,17 +237,17 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool {
forceDeploys := make(map[string]job.JobState, 0)
for _, dequeuedJob := range dequeuedJobs {
if dequeuedJob.Type == job.JobType_Deploy {
if dequeuedJobForce, _ := dequeuedJob.Params[job.JobParam_Force].(bool); dequeuedJobForce {
if dequeuedJobForce, _ := dequeuedJob.Params[job.DeployJobParam_Force].(bool); dequeuedJobForce {
// Replace an existing job with a newer one, or add a new job (hence a map).
forceDeploys[dequeuedJob.Params[job.JobParam_Component].(string)] = dequeuedJob
forceDeploys[dequeuedJob.Params[job.DeployJobParam_Component].(string)] = dequeuedJob
}
}
}
if len(forceDeploys) > 0 {
// Skip any dequeued jobs for components being force deployed
for _, dequeuedJob := range dequeuedJobs {
if dequeuedJob.Type == job.JobType_Deploy {
if forceDeploy, found := forceDeploys[dequeuedJob.Params[job.JobParam_Component].(string)]; found && (dequeuedJob.Job != forceDeploy.Job) {
if forceDeploy, found := forceDeploys[dequeuedJob.Params[job.DeployJobParam_Component].(string)]; found && (dequeuedJob.Job != forceDeploy.Job) {
if err := m.updateJobStage(dequeuedJob, job.JobStage_Skipped, nil); err != nil {
// Return `true` from here so that no state is changed and the loop can restart cleanly. Any
// jobs already skipped won't be picked up again, which is ok.
Expand All @@ -265,7 +261,7 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool {
return job.IsActiveJob(js) && (js.Type == job.JobType_Deploy)
})
for _, activeDeploy := range activeDeploys {
if _, found := forceDeploys[activeDeploy.Params[job.JobParam_Component].(string)]; found {
if _, found := forceDeploys[activeDeploy.Params[job.DeployJobParam_Component].(string)]; found {
if err := m.updateJobStage(activeDeploy, job.JobStage_Canceled, nil); err != nil {
// Return `true` from here so that no state is changed and the loop can restart cleanly. Any jobs
// already skipped won't be picked up again, which is ok.
Expand All @@ -274,10 +270,7 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool {
}
}
// Now advance all force deploy jobs, order doesn't matter.
for _, deployJob := range forceDeploys {
log.Printf("processForceDeployJobs: starting force deploy job: %s", manager.PrintJob(deployJob))
m.advanceJob(deployJob)
}
m.advanceJobs(maps.Values(forceDeploys))
return true
}
return false
Expand All @@ -290,14 +283,14 @@ func (m *JobManager) processDeployJobs(dequeuedJobs []job.JobState) bool {
// We know the first job is a deploy, so pick out the component for that job, collapse as many back-to-back jobs
// as possible for that component, then run the final job.
deployJob := dequeuedJobs[0]
deployComponent := deployJob.Params[job.JobParam_Component].(string)
deployComponent := deployJob.Params[job.DeployJobParam_Component].(string)
// Collapse similar, back-to-back deployments into a single run and kick it off.
for i := 1; i < len(dequeuedJobs); i++ {
dequeuedJob := dequeuedJobs[i]
// Break out of the loop as soon as we find a test job - we don't want to collapse deploys across them.
if (dequeuedJob.Type == job.JobType_TestE2E) || (dequeuedJob.Type == job.JobType_TestSmoke) {
break
} else if (dequeuedJob.Type == job.JobType_Deploy) && (dequeuedJob.Params[job.JobParam_Component].(string) == deployComponent) {
} else if (dequeuedJob.Type == job.JobType_Deploy) && (dequeuedJob.Params[job.DeployJobParam_Component].(string) == deployComponent) {
// Skip the current deploy job, and replace it with a newer one.
if err := m.updateJobStage(deployJob, job.JobStage_Skipped, nil); err != nil {
// Return `true` from here so that no state is changed and the loop can restart cleanly. Any
Expand All @@ -307,7 +300,6 @@ func (m *JobManager) processDeployJobs(dequeuedJobs []job.JobState) bool {
deployJob = dequeuedJob
}
}
log.Printf("processDeployJobs: starting deploy job: %s", manager.PrintJob(deployJob))
m.advanceJob(deployJob)
return true
} else {
Expand Down Expand Up @@ -357,10 +349,7 @@ func (m *JobManager) processVxAnchorJobs(dequeuedJobs []job.JobState, processV5J
}
}
// Now advance all anchor jobs, order doesn't matter.
for _, anchorJob := range dequeuedAnchors {
log.Printf("processVxAnchorJobs: starting anchor job: %s", manager.PrintJob(anchorJob))
m.advanceJob(anchorJob)
}
m.advanceJobs(dequeuedAnchors)
// If not enough anchor jobs were running to satisfy the configured minimum number of workers, add jobs to the queue
// to make up the difference. These jobs should get picked up in a subsequent job manager iteration, properly
// coordinated with other jobs in the queue. It's ok if we ultimately end up with more jobs queued than the
Expand Down Expand Up @@ -413,10 +402,7 @@ func (m *JobManager) processTestJobs(dequeuedJobs []job.JobState) bool {
dequeuedTests[dequeuedJob.Type] = dequeuedJob
}
}
for _, testJob := range dequeuedTests {
log.Printf("processTestJobs: starting test job: %s", manager.PrintJob(testJob))
m.advanceJob(testJob)
}
m.advanceJobs(maps.Values(dequeuedTests))
return len(dequeuedTests) > 0
} else {
log.Printf("processTestJobs: deployment in progress")
Expand Down Expand Up @@ -481,17 +467,17 @@ func (m *JobManager) postProcessJob(jobState job.JobState) {
case job.JobStage_Failed:
{
// Only rollback if this wasn't already a rollback attempt that failed
if rollback, _ := jobState.Params[job.JobParam_Rollback].(bool); !rollback {
if rollback, _ := jobState.Params[job.DeployJobParam_Rollback].(bool); !rollback {
if _, err := m.NewJob(job.JobState{
Type: job.JobType_Deploy,
Params: map[string]interface{}{
job.JobParam_Component: jobState.Params[job.JobParam_Component],
job.JobParam_Rollback: true,
job.DeployJobParam_Component: jobState.Params[job.DeployJobParam_Component],
job.DeployJobParam_Rollback: true,
// Make the job lookup the last successfully deployed commit hash from the database
job.JobParam_Sha: ".",
job.DeployJobParam_Sha: ".",
// No point in waiting for other jobs to complete before redeploying a working image
job.JobParam_Force: true,
job.JobParam_Source: manager.ServiceName,
job.DeployJobParam_Force: true,
job.JobParam_Source: manager.ServiceName,
},
}); err != nil {
log.Printf("postProcessJob: failed to queue rollback after failed deploy: %v, %s", err, manager.PrintJob(jobState))
Expand Down
18 changes: 9 additions & 9 deletions cd/manager/jobs/anchor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ func (a anchorJob) Advance() (job.JobState, error) {
{
// No preparation needed so advance the job directly to "dequeued".
//
// Don't update the timestamp here so that the "dequeued" event remains at the same position on the timeline
// as the "queued" event.
return a.advance(job.JobStage_Dequeued, a.state.Ts, nil)
// Advance the timestamp by a tiny amount so that the "dequeued" event remains at the same position on the
// timeline as the "queued" event but still ahead of it.
return a.advance(job.JobStage_Dequeued, a.state.Ts.Add(time.Nanosecond), nil)
}
case job.JobStage_Dequeued:
{
Expand All @@ -42,7 +42,7 @@ func (a anchorJob) Advance() (job.JobState, error) {
} else {
// Record the worker task identifier and its start time
a.state.Params[job.JobParam_Id] = taskId
a.state.Params[job.JobParam_Start] = time.Now().UnixNano()
a.state.Params[job.JobParam_Start] = float64(time.Now().UnixNano())
return a.advance(job.JobStage_Started, now, nil)
}
}
Expand All @@ -63,13 +63,13 @@ func (a anchorJob) Advance() (job.JobState, error) {
return a.advance(job.JobStage_Failed, now, err)
} else if stopped {
return a.advance(job.JobStage_Completed, now, nil)
} else if delayed, _ := a.state.Params[job.JobParam_Delayed].(bool); !delayed && job.IsTimedOut(a.state, AnchorStalledTime/2) {
} else if delayed, _ := a.state.Params[job.AnchorJobParam_Delayed].(bool); !delayed && job.IsTimedOut(a.state, AnchorStalledTime/2) {
// If the job has been running for > 1.5 hours, mark it "delayed".
a.state.Params[job.JobParam_Delayed] = true
a.state.Params[job.AnchorJobParam_Delayed] = true
return a.advance(job.JobStage_Waiting, now, nil)
} else if stalled, _ := a.state.Params[job.JobParam_Stalled].(bool); !stalled && job.IsTimedOut(a.state, AnchorStalledTime) {
} else if stalled, _ := a.state.Params[job.AnchorJobParam_Stalled].(bool); !stalled && job.IsTimedOut(a.state, AnchorStalledTime) {
// If the job has been running for > 3 hours, mark it "stalled".
a.state.Params[job.JobParam_Stalled] = true
a.state.Params[job.AnchorJobParam_Stalled] = true
return a.advance(job.JobStage_Waiting, now, nil)
} else {
// Return so we come back again to check
Expand All @@ -87,7 +87,7 @@ func (a anchorJob) launchWorker() (string, error) {
var overrides map[string]string = nil
// Check if this is a CASv5 anchor job
if manager.IsV5WorkerJob(a.state) {
if parsedOverrides, found := a.state.Params[job.JobParam_Overrides].(map[string]interface{}); found {
if parsedOverrides, found := a.state.Params[job.AnchorJobParam_Overrides].(map[string]interface{}); found {
overrides = make(map[string]string, len(parsedOverrides))
for k, v := range parsedOverrides {
overrides[k] = v.(string)
Expand Down
Loading

0 comments on commit d7bc47e

Please sign in to comment.