Skip to content

Commit

Permalink
chore(cd): more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Oct 30, 2023
1 parent 1fc0bf9 commit 6f3ba7b
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 99 deletions.
4 changes: 2 additions & 2 deletions cd/manager/common/aws/ddb/dynamoDb.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,12 @@ func (db DynamoDb) iterateEvents(queryInput *dynamodb.QueryInput, iter func(job.
for _, jobState := range jobsPage {
if jobState.Type == job.JobType_Deploy {
// Marshal layout back into `Layout` structure
if layout, found := jobState.Params[job.JobParam_Layout].(map[string]interface{}); found {
if layout, found := jobState.Params[job.DeployJobParam_Layout].(map[string]interface{}); found {
var marshaledLayout manager.Layout
if err = mapstructure.Decode(layout, &marshaledLayout); err != nil {
return err
}
jobState.Params[job.JobParam_Layout] = marshaledLayout
jobState.Params[job.DeployJobParam_Layout] = marshaledLayout
}
}
if !iter(jobState) {
Expand Down
36 changes: 21 additions & 15 deletions cd/manager/common/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,27 @@ const (
)

const (
JobParam_Component string = "component"
JobParam_Id string = "id"
JobParam_Sha string = "sha"
JobParam_ShaTag string = "shaTag"
JobParam_Error string = "error"
JobParam_Layout string = "layout"
JobParam_Manual string = "manual"
JobParam_Force string = "force"
JobParam_Start string = "start"
JobParam_Rollback string = "rollback"
JobParam_Delayed string = "delayed"
JobParam_Stalled string = "stalled"
JobParam_Source string = "source"
JobParam_Version string = "version"
JobParam_Overrides string = "overrides"
JobParam_Id string = "id"
JobParam_Error string = "error"
JobParam_Start string = "start"
JobParam_Source string = "source"
)

const (
DeployJobParam_Component string = "component"
DeployJobParam_Sha string = "sha"
DeployJobParam_ShaTag string = "shaTag"
DeployJobParam_Layout string = "layout"
DeployJobParam_Manual string = "manual"
DeployJobParam_Force string = "force"
DeployJobParam_Rollback string = "rollback"
)

const (
AnchorJobParam_Delayed string = "delayed"
AnchorJobParam_Stalled string = "stalled"
AnchorJobParam_Version string = "version"
AnchorJobParam_Overrides string = "overrides"
)

// JobState represents the state of a job in the database
Expand Down
3 changes: 2 additions & 1 deletion cd/manager/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssm v1.27.12
github.com/disgoorg/disgo v0.13.16
github.com/disgoorg/snowflake/v2 v2.0.0
github.com/google/go-github v17.0.0+incompatible
github.com/google/go-github/v56 v56.0.0
github.com/google/uuid v1.3.0
github.com/joho/godotenv v1.4.0
github.com/mitchellh/mapstructure v1.5.0
golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8
golang.org/x/oauth2 v0.1.0
golang.org/x/text v0.6.0
)
Expand Down
8 changes: 5 additions & 3 deletions cd/manager/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4rEjNlfyDHW9dolSY=
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-github/v56 v56.0.0 h1:TysL7dMa/r7wsQi44BjqlwaHvwlFlqkK8CtBWCX3gb4=
github.com/google/go-github/v56 v56.0.0/go.mod h1:D8cdcX98YWJvi7TLo7zM4/h8ZTx6u6fwGEkCdisopo0=
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
Expand All @@ -90,6 +90,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8 h1:Xt4/LzbTwfocTk9ZLEu4onjeFucl88iW+v4j4PWbQuE=
golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
Expand Down
72 changes: 30 additions & 42 deletions cd/manager/jobmanager/jobManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"time"

"golang.org/x/exp/maps"

"github.com/google/uuid"

"github.com/3box/pipeline-tools/cd/manager"
Expand Down Expand Up @@ -127,27 +129,11 @@ func (m *JobManager) processJobs() {
}
}
// Find all jobs in progress and advance their state before looking for new jobs
activeJobs := m.cache.JobsByMatcher(job.IsActiveJob)
if len(activeJobs) > 0 {
log.Printf("processJobs: checking %d jobs in progress: %s", len(activeJobs), manager.PrintJob(activeJobs...))
for _, activeJob := range activeJobs {
m.advanceJob(activeJob)
}
}
// Wait for any running job advancement goroutines to finish before kicking off more jobs
m.waitGroup.Wait()
m.advanceJobs(m.cache.JobsByMatcher(job.IsActiveJob))
// Don't start any new jobs if the job manager is paused. Existing jobs will continue to be advanced.
if !m.paused {
// Advance each freshly discovered "queued" job to the "dequeued" stage
queuedJobs := m.db.QueuedJobs()
if len(queuedJobs) > 0 {
log.Printf("processJobs: found queued %d jobs...", len(queuedJobs))
for _, jobState := range queuedJobs {
m.advanceJob(jobState)
}
// Wait for any running job advancement goroutines to finish before processing jobs
m.waitGroup.Wait()
}
m.advanceJobs(m.db.QueuedJobs())
// Always attempt to check if we have anchor jobs, even if none were dequeued. This is because we might have a
// configured minimum number of jobs to run.
processAnchorJobs := true
Expand Down Expand Up @@ -206,6 +192,17 @@ func (m *JobManager) processJobs() {
m.waitGroup.Wait()
}

func (m *JobManager) advanceJobs(jobs []job.JobState) {
if len(jobs) > 0 {
log.Printf("advanceJobs: advancing %d jobs...", len(jobs))
for _, jobState := range jobs {
m.advanceJob(jobState)
}
// Wait for any running job advancement goroutines to finish
m.waitGroup.Wait()
}
}

func (m *JobManager) checkJobInterval(jobType job.JobType, jobStage job.JobStage, intervalEnv string, processFn func(time.Time) error) error {
if interval, found := os.LookupEnv(intervalEnv); found {
if parsedInterval, err := time.ParseDuration(interval); err != nil {
Expand Down Expand Up @@ -241,17 +238,17 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool {
forceDeploys := make(map[string]job.JobState, 0)
for _, dequeuedJob := range dequeuedJobs {
if dequeuedJob.Type == job.JobType_Deploy {
if dequeuedJobForce, _ := dequeuedJob.Params[job.JobParam_Force].(bool); dequeuedJobForce {
if dequeuedJobForce, _ := dequeuedJob.Params[job.DeployJobParam_Force].(bool); dequeuedJobForce {
// Replace an existing job with a newer one, or add a new job (hence a map).
forceDeploys[dequeuedJob.Params[job.JobParam_Component].(string)] = dequeuedJob
forceDeploys[dequeuedJob.Params[job.DeployJobParam_Component].(string)] = dequeuedJob
}
}
}
if len(forceDeploys) > 0 {
// Skip any dequeued jobs for components being force deployed
for _, dequeuedJob := range dequeuedJobs {
if dequeuedJob.Type == job.JobType_Deploy {
if forceDeploy, found := forceDeploys[dequeuedJob.Params[job.JobParam_Component].(string)]; found && (dequeuedJob.Job != forceDeploy.Job) {
if forceDeploy, found := forceDeploys[dequeuedJob.Params[job.DeployJobParam_Component].(string)]; found && (dequeuedJob.Job != forceDeploy.Job) {
if err := m.updateJobStage(dequeuedJob, job.JobStage_Skipped, nil); err != nil {
// Return `true` from here so that no state is changed and the loop can restart cleanly. Any
// jobs already skipped won't be picked up again, which is ok.
Expand All @@ -265,7 +262,7 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool {
return job.IsActiveJob(js) && (js.Type == job.JobType_Deploy)
})
for _, activeDeploy := range activeDeploys {
if _, found := forceDeploys[activeDeploy.Params[job.JobParam_Component].(string)]; found {
if _, found := forceDeploys[activeDeploy.Params[job.DeployJobParam_Component].(string)]; found {
if err := m.updateJobStage(activeDeploy, job.JobStage_Canceled, nil); err != nil {
// Return `true` from here so that no state is changed and the loop can restart cleanly. Any jobs
// already skipped won't be picked up again, which is ok.
Expand All @@ -274,10 +271,7 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool {
}
}
// Now advance all force deploy jobs, order doesn't matter.
for _, deployJob := range forceDeploys {
log.Printf("processForceDeployJobs: starting force deploy job: %s", manager.PrintJob(deployJob))
m.advanceJob(deployJob)
}
m.advanceJobs(maps.Values(forceDeploys))
return true
}
return false
Expand All @@ -290,14 +284,14 @@ func (m *JobManager) processDeployJobs(dequeuedJobs []job.JobState) bool {
// We know the first job is a deploy, so pick out the component for that job, collapse as many back-to-back jobs
// as possible for that component, then run the final job.
deployJob := dequeuedJobs[0]
deployComponent := deployJob.Params[job.JobParam_Component].(string)
deployComponent := deployJob.Params[job.DeployJobParam_Component].(string)
// Collapse similar, back-to-back deployments into a single run and kick it off.
for i := 1; i < len(dequeuedJobs); i++ {
dequeuedJob := dequeuedJobs[i]
// Break out of the loop as soon as we find a test job - we don't want to collapse deploys across them.
if (dequeuedJob.Type == job.JobType_TestE2E) || (dequeuedJob.Type == job.JobType_TestSmoke) {
break
} else if (dequeuedJob.Type == job.JobType_Deploy) && (dequeuedJob.Params[job.JobParam_Component].(string) == deployComponent) {
} else if (dequeuedJob.Type == job.JobType_Deploy) && (dequeuedJob.Params[job.DeployJobParam_Component].(string) == deployComponent) {
// Skip the current deploy job, and replace it with a newer one.
if err := m.updateJobStage(deployJob, job.JobStage_Skipped, nil); err != nil {
// Return `true` from here so that no state is changed and the loop can restart cleanly. Any
Expand Down Expand Up @@ -357,10 +351,7 @@ func (m *JobManager) processVxAnchorJobs(dequeuedJobs []job.JobState, processV5J
}
}
// Now advance all anchor jobs, order doesn't matter.
for _, anchorJob := range dequeuedAnchors {
log.Printf("processVxAnchorJobs: starting anchor job: %s", manager.PrintJob(anchorJob))
m.advanceJob(anchorJob)
}
m.advanceJobs(dequeuedAnchors)
// If not enough anchor jobs were running to satisfy the configured minimum number of workers, add jobs to the queue
// to make up the difference. These jobs should get picked up in a subsequent job manager iteration, properly
// coordinated with other jobs in the queue. It's ok if we ultimately end up with more jobs queued than the
Expand Down Expand Up @@ -413,10 +404,7 @@ func (m *JobManager) processTestJobs(dequeuedJobs []job.JobState) bool {
dequeuedTests[dequeuedJob.Type] = dequeuedJob
}
}
for _, testJob := range dequeuedTests {
log.Printf("processTestJobs: starting test job: %s", manager.PrintJob(testJob))
m.advanceJob(testJob)
}
m.advanceJobs(maps.Values(dequeuedTests))
return len(dequeuedTests) > 0
} else {
log.Printf("processTestJobs: deployment in progress")
Expand Down Expand Up @@ -481,17 +469,17 @@ func (m *JobManager) postProcessJob(jobState job.JobState) {
case job.JobStage_Failed:
{
// Only rollback if this wasn't already a rollback attempt that failed
if rollback, _ := jobState.Params[job.JobParam_Rollback].(bool); !rollback {
if rollback, _ := jobState.Params[job.DeployJobParam_Rollback].(bool); !rollback {
if _, err := m.NewJob(job.JobState{
Type: job.JobType_Deploy,
Params: map[string]interface{}{
job.JobParam_Component: jobState.Params[job.JobParam_Component],
job.JobParam_Rollback: true,
job.DeployJobParam_Component: jobState.Params[job.DeployJobParam_Component],
job.DeployJobParam_Rollback: true,
// Make the job lookup the last successfully deployed commit hash from the database
job.JobParam_Sha: ".",
job.DeployJobParam_Sha: ".",
// No point in waiting for other jobs to complete before redeploying a working image
job.JobParam_Force: true,
job.JobParam_Source: manager.ServiceName,
job.DeployJobParam_Force: true,
job.JobParam_Source: manager.ServiceName,
},
}); err != nil {
log.Printf("postProcessJob: failed to queue rollback after failed deploy: %v, %s", err, manager.PrintJob(jobState))
Expand Down
10 changes: 5 additions & 5 deletions cd/manager/jobs/anchor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ func (a anchorJob) Advance() (job.JobState, error) {
return a.advance(job.JobStage_Failed, now, err)
} else if stopped {
return a.advance(job.JobStage_Completed, now, nil)
} else if delayed, _ := a.state.Params[job.JobParam_Delayed].(bool); !delayed && job.IsTimedOut(a.state, AnchorStalledTime/2) {
} else if delayed, _ := a.state.Params[job.AnchorJobParam_Delayed].(bool); !delayed && job.IsTimedOut(a.state, AnchorStalledTime/2) {
// If the job has been running for > 1.5 hours, mark it "delayed".
a.state.Params[job.JobParam_Delayed] = true
a.state.Params[job.AnchorJobParam_Delayed] = true
return a.advance(job.JobStage_Waiting, now, nil)
} else if stalled, _ := a.state.Params[job.JobParam_Stalled].(bool); !stalled && job.IsTimedOut(a.state, AnchorStalledTime) {
} else if stalled, _ := a.state.Params[job.AnchorJobParam_Stalled].(bool); !stalled && job.IsTimedOut(a.state, AnchorStalledTime) {
// If the job has been running for > 3 hours, mark it "stalled".
a.state.Params[job.JobParam_Stalled] = true
a.state.Params[job.AnchorJobParam_Stalled] = true
return a.advance(job.JobStage_Waiting, now, nil)
} else {
// Return so we come back again to check
Expand All @@ -87,7 +87,7 @@ func (a anchorJob) launchWorker() (string, error) {
var overrides map[string]string = nil
// Check if this is a CASv5 anchor job
if manager.IsV5WorkerJob(a.state) {
if parsedOverrides, found := a.state.Params[job.JobParam_Overrides].(map[string]interface{}); found {
if parsedOverrides, found := a.state.Params[job.AnchorJobParam_Overrides].(map[string]interface{}); found {
overrides = make(map[string]string, len(parsedOverrides))
for k, v := range parsedOverrides {
overrides[k] = v.(string)
Expand Down
20 changes: 10 additions & 10 deletions cd/manager/jobs/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ type deployJob struct {
}

func DeployJob(jobState job.JobState, db manager.Database, notifs manager.Notifs, d manager.Deployment, repo manager.Repository) (manager.JobSm, error) {
if component, found := jobState.Params[job.JobParam_Component].(string); !found {
if component, found := jobState.Params[job.DeployJobParam_Component].(string); !found {
return nil, fmt.Errorf("deployJob: missing component (ceramic, ipfs, cas)")
} else if sha, found := jobState.Params[job.JobParam_Sha].(string); !found {
} else if sha, found := jobState.Params[job.DeployJobParam_Sha].(string); !found {
return nil, fmt.Errorf("deployJob: missing sha")
} else {
manual, _ := jobState.Params[job.JobParam_Manual].(bool)
rollback, _ := jobState.Params[job.JobParam_Rollback].(bool)
manual, _ := jobState.Params[job.DeployJobParam_Manual].(bool)
rollback, _ := jobState.Params[job.DeployJobParam_Rollback].(bool)
return &deployJob{baseJob{jobState, db, notifs}, manager.DeployComponent(component), sha, manual, rollback, d, repo}, nil
}
}
Expand All @@ -51,7 +51,7 @@ func (d deployJob) Advance() (job.JobState, error) {
} else if envLayout, err := d.d.GenerateEnvLayout(d.component); err != nil {
return d.advance(job.JobStage_Failed, now, err)
} else {
d.state.Params[job.JobParam_Layout] = *envLayout
d.state.Params[job.DeployJobParam_Layout] = *envLayout
// Don't update the timestamp here so that the "dequeued" event remains at the same position on the
// timeline as the "queued" event.
return d.advance(job.JobStage_Dequeued, d.state.Ts, nil)
Expand Down Expand Up @@ -111,7 +111,7 @@ func (d deployJob) prepareJob(deployHashes map[manager.DeployComponent]string) e
//
// The last two cases will only happen when redeploying manually, so we can note that in the notification.
if d.sha == manager.BuildHashLatest {
shaTag, _ := d.state.Params[job.JobParam_ShaTag].(string)
shaTag, _ := d.state.Params[job.DeployJobParam_ShaTag].(string)
if latestSha, err := d.repo.GetLatestCommitHash(
manager.ComponentRepo(d.component),
manager.EnvBranch(d.component, manager.EnvType(os.Getenv("ENV"))),
Expand All @@ -132,22 +132,22 @@ func (d deployJob) prepareJob(deployHashes map[manager.DeployComponent]string) e
}
d.manual = true
}
d.state.Params[job.JobParam_Sha] = d.sha
d.state.Params[job.DeployJobParam_Sha] = d.sha
if d.manual {
d.state.Params[job.JobParam_Manual] = true
d.state.Params[job.DeployJobParam_Manual] = true
}
return nil
}

func (d deployJob) updateEnv(commitHash string) error {
if layout, found := d.state.Params[job.JobParam_Layout].(manager.Layout); found {
if layout, found := d.state.Params[job.DeployJobParam_Layout].(manager.Layout); found {
return d.d.UpdateEnv(&layout, commitHash)
}
return fmt.Errorf("updateEnv: missing env layout")
}

func (d deployJob) checkEnv() (bool, error) {
if layout, found := d.state.Params[job.JobParam_Layout].(manager.Layout); !found {
if layout, found := d.state.Params[job.DeployJobParam_Layout].(manager.Layout); !found {
return false, fmt.Errorf("checkEnv: missing env layout")
} else if deployed, err := d.d.CheckEnv(&layout); err != nil {
return false, err
Expand Down
Loading

0 comments on commit 6f3ba7b

Please sign in to comment.