diff --git a/cd/manager/common/aws/ddb/dynamoDb.go b/cd/manager/common/aws/ddb/dynamoDb.go index 3617e56..f7635bd 100644 --- a/cd/manager/common/aws/ddb/dynamoDb.go +++ b/cd/manager/common/aws/ddb/dynamoDb.go @@ -258,12 +258,12 @@ func (db DynamoDb) iterateEvents(queryInput *dynamodb.QueryInput, iter func(job. for _, jobState := range jobsPage { if jobState.Type == job.JobType_Deploy { // Marshal layout back into `Layout` structure - if layout, found := jobState.Params[job.JobParam_Layout].(map[string]interface{}); found { + if layout, found := jobState.Params[job.DeployJobParam_Layout].(map[string]interface{}); found { var marshaledLayout manager.Layout if err = mapstructure.Decode(layout, &marshaledLayout); err != nil { return err } - jobState.Params[job.JobParam_Layout] = marshaledLayout + jobState.Params[job.DeployJobParam_Layout] = marshaledLayout } } if !iter(jobState) { diff --git a/cd/manager/common/job/job.go b/cd/manager/common/job/job.go index 82e1110..bdf6802 100644 --- a/cd/manager/common/job/job.go +++ b/cd/manager/common/job/job.go @@ -11,21 +11,6 @@ import ( "github.com/3box/pipeline-tools/cd/manager/common/aws/utils" ) -func JobName(jt JobType) string { - switch jt { - case JobType_Deploy: - return JobName_Deploy - case JobType_Anchor: - return JobName_Anchor - case JobType_TestE2E: - return JobName_TestE2E - case JobType_TestSmoke: - return JobName_TestSmoke - default: - return "" - } -} - func IsFinishedJob(jobState JobState) bool { return (jobState.Stage == JobStage_Skipped) || (jobState.Stage == JobStage_Canceled) || (jobState.Stage == JobStage_Failed) || (jobState.Stage == JobStage_Completed) } diff --git a/cd/manager/common/job/models.go b/cd/manager/common/job/models.go index b0dbd31..ff98a2e 100644 --- a/cd/manager/common/job/models.go +++ b/cd/manager/common/job/models.go @@ -15,13 +15,6 @@ const ( JobType_TestSmoke JobType = "test_smoke" ) -const ( - JobName_Deploy = "Deployment" - JobName_Anchor = "Anchor Worker" - JobName_TestE2E = "E2E Tests" - JobName_TestSmoke = "Smoke Tests" -) - type JobStage string const ( @@ -36,21 +29,27 @@ const ( ) const ( - JobParam_Component string = "component" - JobParam_Id string = "id" - JobParam_Sha string = "sha" - JobParam_ShaTag string = "shaTag" - JobParam_Error string = "error" - JobParam_Layout string = "layout" - JobParam_Manual string = "manual" - JobParam_Force string = "force" - JobParam_Start string = "start" - JobParam_Rollback string = "rollback" - JobParam_Delayed string = "delayed" - JobParam_Stalled string = "stalled" - JobParam_Source string = "source" - JobParam_Version string = "version" - JobParam_Overrides string = "overrides" + JobParam_Id string = "id" + JobParam_Error string = "error" + JobParam_Start string = "start" + JobParam_Source string = "source" +) + +const ( + DeployJobParam_Component string = "component" + DeployJobParam_Sha string = "sha" + DeployJobParam_ShaTag string = "shaTag" + DeployJobParam_Layout string = "layout" + DeployJobParam_Manual string = "manual" + DeployJobParam_Force string = "force" + DeployJobParam_Rollback string = "rollback" +) + +const ( + AnchorJobParam_Delayed string = "delayed" + AnchorJobParam_Stalled string = "stalled" + AnchorJobParam_Version string = "version" + AnchorJobParam_Overrides string = "overrides" ) // JobState represents the state of a job in the database diff --git a/cd/manager/go.mod b/cd/manager/go.mod index 50fba6a..849ce65 100644 --- a/cd/manager/go.mod +++ b/cd/manager/go.mod @@ -18,10 +18,11 @@ require ( github.com/aws/aws-sdk-go-v2/service/ssm v1.27.12 github.com/disgoorg/disgo v0.13.16 github.com/disgoorg/snowflake/v2 v2.0.0 - github.com/google/go-github v17.0.0+incompatible + github.com/google/go-github/v56 v56.0.0 github.com/google/uuid v1.3.0 github.com/joho/godotenv v1.4.0 github.com/mitchellh/mapstructure v1.5.0 + golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8 golang.org/x/oauth2 v0.1.0 golang.org/x/text v0.6.0 ) diff --git a/cd/manager/go.sum b/cd/manager/go.sum index e48d1c0..5f34390 100644 --- a/cd/manager/go.sum +++ b/cd/manager/go.sum @@ -66,10 +66,10 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4rEjNlfyDHW9dolSY= -github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= +github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-github/v56 v56.0.0 h1:TysL7dMa/r7wsQi44BjqlwaHvwlFlqkK8CtBWCX3gb4= +github.com/google/go-github/v56 v56.0.0/go.mod h1:D8cdcX98YWJvi7TLo7zM4/h8ZTx6u6fwGEkCdisopo0= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= @@ -90,6 +90,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8 h1:Xt4/LzbTwfocTk9ZLEu4onjeFucl88iW+v4j4PWbQuE= +golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= diff --git a/cd/manager/jobmanager/jobManager.go b/cd/manager/jobmanager/jobManager.go index 196c0b0..20c3efa 100644 --- a/cd/manager/jobmanager/jobManager.go +++ b/cd/manager/jobmanager/jobManager.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "golang.org/x/exp/maps" + "github.com/google/uuid" "github.com/3box/pipeline-tools/cd/manager" @@ -52,7 +54,7 @@ func NewJobManager(cache manager.Cache, db manager.Database, d manager.Deploymen return &JobManager{cache, db, d, apiGw, repo, notifs, maxAnchorJobs, minAnchorJobs, paused, manager.EnvType(os.Getenv("ENV")), new(sync.WaitGroup)}, nil } -func (m *JobManager) NewJob(jobState job.JobState) (string, error) { +func (m *JobManager) NewJob(jobState job.JobState) (job.JobState, error) { jobState.Stage = job.JobStage_Queued // Only set the job ID/time if not already set by the caller if len(jobState.Job) == 0 { @@ -64,14 +66,14 @@ func (m *JobManager) NewJob(jobState job.JobState) (string, error) { if jobState.Params == nil { jobState.Params = make(map[string]interface{}, 0) } - return jobState.Job, m.db.QueueJob(jobState) + return jobState, m.db.QueueJob(jobState) } -func (m *JobManager) CheckJob(jobId string) string { +func (m *JobManager) CheckJob(jobId string) job.JobState { if cachedJob, found := m.cache.JobById(jobId); found { - return string(cachedJob.Stage) + return cachedJob } - return "" + return job.JobState{} } func (m *JobManager) ProcessJobs(shutdownCh chan bool) { @@ -127,27 +129,11 @@ func (m *JobManager) processJobs() { } } // Find all jobs in progress and advance their state before looking for new jobs - activeJobs := m.cache.JobsByMatcher(job.IsActiveJob) - if len(activeJobs) > 0 { - log.Printf("processJobs: checking %d jobs in progress: %s", len(activeJobs), manager.PrintJob(activeJobs...)) - for _, activeJob := range activeJobs { - m.advanceJob(activeJob) - } - } - // Wait for any running job advancement goroutines to finish before kicking off more jobs - m.waitGroup.Wait() + m.advanceJobs(m.cache.JobsByMatcher(job.IsActiveJob)) // Don't start any new jobs if the job manager is paused. Existing jobs will continue to be advanced. if !m.paused { // Advance each freshly discovered "queued" job to the "dequeued" stage - queuedJobs := m.db.QueuedJobs() - if len(queuedJobs) > 0 { - log.Printf("processJobs: found queued %d jobs...", len(queuedJobs)) - for _, jobState := range queuedJobs { - m.advanceJob(jobState) - } - // Wait for any running job advancement goroutines to finish before processing jobs - m.waitGroup.Wait() - } + m.advanceJobs(m.db.QueuedJobs()) // Always attempt to check if we have anchor jobs, even if none were dequeued. This is because we might have a // configured minimum number of jobs to run. processAnchorJobs := true @@ -206,6 +192,16 @@ func (m *JobManager) processJobs() { m.waitGroup.Wait() } +func (m *JobManager) advanceJobs(jobs []job.JobState) { + if len(jobs) > 0 { + for _, jobState := range jobs { + m.advanceJob(jobState) + } + // Wait for any running job advancement goroutines to finish + m.waitGroup.Wait() + } +} + func (m *JobManager) checkJobInterval(jobType job.JobType, jobStage job.JobStage, intervalEnv string, processFn func(time.Time) error) error { if interval, found := os.LookupEnv(intervalEnv); found { if parsedInterval, err := time.ParseDuration(interval); err != nil { @@ -241,9 +237,9 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool { forceDeploys := make(map[string]job.JobState, 0) for _, dequeuedJob := range dequeuedJobs { if dequeuedJob.Type == job.JobType_Deploy { - if dequeuedJobForce, _ := dequeuedJob.Params[job.JobParam_Force].(bool); dequeuedJobForce { + if dequeuedJobForce, _ := dequeuedJob.Params[job.DeployJobParam_Force].(bool); dequeuedJobForce { // Replace an existing job with a newer one, or add a new job (hence a map). - forceDeploys[dequeuedJob.Params[job.JobParam_Component].(string)] = dequeuedJob + forceDeploys[dequeuedJob.Params[job.DeployJobParam_Component].(string)] = dequeuedJob } } } @@ -251,7 +247,7 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool { // Skip any dequeued jobs for components being force deployed for _, dequeuedJob := range dequeuedJobs { if dequeuedJob.Type == job.JobType_Deploy { - if forceDeploy, found := forceDeploys[dequeuedJob.Params[job.JobParam_Component].(string)]; found && (dequeuedJob.Job != forceDeploy.Job) { + if forceDeploy, found := forceDeploys[dequeuedJob.Params[job.DeployJobParam_Component].(string)]; found && (dequeuedJob.Job != forceDeploy.Job) { if err := m.updateJobStage(dequeuedJob, job.JobStage_Skipped, nil); err != nil { // Return `true` from here so that no state is changed and the loop can restart cleanly. Any // jobs already skipped won't be picked up again, which is ok. @@ -265,7 +261,7 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool { return job.IsActiveJob(js) && (js.Type == job.JobType_Deploy) }) for _, activeDeploy := range activeDeploys { - if _, found := forceDeploys[activeDeploy.Params[job.JobParam_Component].(string)]; found { + if _, found := forceDeploys[activeDeploy.Params[job.DeployJobParam_Component].(string)]; found { if err := m.updateJobStage(activeDeploy, job.JobStage_Canceled, nil); err != nil { // Return `true` from here so that no state is changed and the loop can restart cleanly. Any jobs // already skipped won't be picked up again, which is ok. @@ -274,10 +270,7 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool { } } // Now advance all force deploy jobs, order doesn't matter. - for _, deployJob := range forceDeploys { - log.Printf("processForceDeployJobs: starting force deploy job: %s", manager.PrintJob(deployJob)) - m.advanceJob(deployJob) - } + m.advanceJobs(maps.Values(forceDeploys)) return true } return false @@ -290,14 +283,14 @@ func (m *JobManager) processDeployJobs(dequeuedJobs []job.JobState) bool { // We know the first job is a deploy, so pick out the component for that job, collapse as many back-to-back jobs // as possible for that component, then run the final job. deployJob := dequeuedJobs[0] - deployComponent := deployJob.Params[job.JobParam_Component].(string) + deployComponent := deployJob.Params[job.DeployJobParam_Component].(string) // Collapse similar, back-to-back deployments into a single run and kick it off. for i := 1; i < len(dequeuedJobs); i++ { dequeuedJob := dequeuedJobs[i] // Break out of the loop as soon as we find a test job - we don't want to collapse deploys across them. if (dequeuedJob.Type == job.JobType_TestE2E) || (dequeuedJob.Type == job.JobType_TestSmoke) { break - } else if (dequeuedJob.Type == job.JobType_Deploy) && (dequeuedJob.Params[job.JobParam_Component].(string) == deployComponent) { + } else if (dequeuedJob.Type == job.JobType_Deploy) && (dequeuedJob.Params[job.DeployJobParam_Component].(string) == deployComponent) { // Skip the current deploy job, and replace it with a newer one. if err := m.updateJobStage(deployJob, job.JobStage_Skipped, nil); err != nil { // Return `true` from here so that no state is changed and the loop can restart cleanly. Any @@ -307,7 +300,6 @@ func (m *JobManager) processDeployJobs(dequeuedJobs []job.JobState) bool { deployJob = dequeuedJob } } - log.Printf("processDeployJobs: starting deploy job: %s", manager.PrintJob(deployJob)) m.advanceJob(deployJob) return true } else { @@ -357,10 +349,7 @@ func (m *JobManager) processVxAnchorJobs(dequeuedJobs []job.JobState, processV5J } } // Now advance all anchor jobs, order doesn't matter. - for _, anchorJob := range dequeuedAnchors { - log.Printf("processVxAnchorJobs: starting anchor job: %s", manager.PrintJob(anchorJob)) - m.advanceJob(anchorJob) - } + m.advanceJobs(dequeuedAnchors) // If not enough anchor jobs were running to satisfy the configured minimum number of workers, add jobs to the queue // to make up the difference. These jobs should get picked up in a subsequent job manager iteration, properly // coordinated with other jobs in the queue. It's ok if we ultimately end up with more jobs queued than the @@ -413,10 +402,7 @@ func (m *JobManager) processTestJobs(dequeuedJobs []job.JobState) bool { dequeuedTests[dequeuedJob.Type] = dequeuedJob } } - for _, testJob := range dequeuedTests { - log.Printf("processTestJobs: starting test job: %s", manager.PrintJob(testJob)) - m.advanceJob(testJob) - } + m.advanceJobs(maps.Values(dequeuedTests)) return len(dequeuedTests) > 0 } else { log.Printf("processTestJobs: deployment in progress") @@ -481,17 +467,17 @@ func (m *JobManager) postProcessJob(jobState job.JobState) { case job.JobStage_Failed: { // Only rollback if this wasn't already a rollback attempt that failed - if rollback, _ := jobState.Params[job.JobParam_Rollback].(bool); !rollback { + if rollback, _ := jobState.Params[job.DeployJobParam_Rollback].(bool); !rollback { if _, err := m.NewJob(job.JobState{ Type: job.JobType_Deploy, Params: map[string]interface{}{ - job.JobParam_Component: jobState.Params[job.JobParam_Component], - job.JobParam_Rollback: true, + job.DeployJobParam_Component: jobState.Params[job.DeployJobParam_Component], + job.DeployJobParam_Rollback: true, // Make the job lookup the last successfully deployed commit hash from the database - job.JobParam_Sha: ".", + job.DeployJobParam_Sha: ".", // No point in waiting for other jobs to complete before redeploying a working image - job.JobParam_Force: true, - job.JobParam_Source: manager.ServiceName, + job.DeployJobParam_Force: true, + job.JobParam_Source: manager.ServiceName, }, }); err != nil { log.Printf("postProcessJob: failed to queue rollback after failed deploy: %v, %s", err, manager.PrintJob(jobState)) diff --git a/cd/manager/jobs/anchor.go b/cd/manager/jobs/anchor.go index e5fc494..af8574e 100644 --- a/cd/manager/jobs/anchor.go +++ b/cd/manager/jobs/anchor.go @@ -31,9 +31,9 @@ func (a anchorJob) Advance() (job.JobState, error) { { // No preparation needed so advance the job directly to "dequeued". // - // Don't update the timestamp here so that the "dequeued" event remains at the same position on the timeline - // as the "queued" event. - return a.advance(job.JobStage_Dequeued, a.state.Ts, nil) + // Advance the timestamp by a tiny amount so that the "dequeued" event remains at the same position on the + // timeline as the "queued" event but still ahead of it. + return a.advance(job.JobStage_Dequeued, a.state.Ts.Add(time.Nanosecond), nil) } case job.JobStage_Dequeued: { @@ -42,7 +42,7 @@ func (a anchorJob) Advance() (job.JobState, error) { } else { // Record the worker task identifier and its start time a.state.Params[job.JobParam_Id] = taskId - a.state.Params[job.JobParam_Start] = time.Now().UnixNano() + a.state.Params[job.JobParam_Start] = float64(time.Now().UnixNano()) return a.advance(job.JobStage_Started, now, nil) } } @@ -63,13 +63,13 @@ func (a anchorJob) Advance() (job.JobState, error) { return a.advance(job.JobStage_Failed, now, err) } else if stopped { return a.advance(job.JobStage_Completed, now, nil) - } else if delayed, _ := a.state.Params[job.JobParam_Delayed].(bool); !delayed && job.IsTimedOut(a.state, AnchorStalledTime/2) { + } else if delayed, _ := a.state.Params[job.AnchorJobParam_Delayed].(bool); !delayed && job.IsTimedOut(a.state, AnchorStalledTime/2) { // If the job has been running for > 1.5 hours, mark it "delayed". - a.state.Params[job.JobParam_Delayed] = true + a.state.Params[job.AnchorJobParam_Delayed] = true return a.advance(job.JobStage_Waiting, now, nil) - } else if stalled, _ := a.state.Params[job.JobParam_Stalled].(bool); !stalled && job.IsTimedOut(a.state, AnchorStalledTime) { + } else if stalled, _ := a.state.Params[job.AnchorJobParam_Stalled].(bool); !stalled && job.IsTimedOut(a.state, AnchorStalledTime) { // If the job has been running for > 3 hours, mark it "stalled". - a.state.Params[job.JobParam_Stalled] = true + a.state.Params[job.AnchorJobParam_Stalled] = true return a.advance(job.JobStage_Waiting, now, nil) } else { // Return so we come back again to check @@ -87,7 +87,7 @@ func (a anchorJob) launchWorker() (string, error) { var overrides map[string]string = nil // Check if this is a CASv5 anchor job if manager.IsV5WorkerJob(a.state) { - if parsedOverrides, found := a.state.Params[job.JobParam_Overrides].(map[string]interface{}); found { + if parsedOverrides, found := a.state.Params[job.AnchorJobParam_Overrides].(map[string]interface{}); found { overrides = make(map[string]string, len(parsedOverrides)) for k, v := range parsedOverrides { overrides[k] = v.(string) diff --git a/cd/manager/jobs/deploy.go b/cd/manager/jobs/deploy.go index 860956c..76d7718 100644 --- a/cd/manager/jobs/deploy.go +++ b/cd/manager/jobs/deploy.go @@ -23,13 +23,13 @@ type deployJob struct { } func DeployJob(jobState job.JobState, db manager.Database, notifs manager.Notifs, d manager.Deployment, repo manager.Repository) (manager.JobSm, error) { - if component, found := jobState.Params[job.JobParam_Component].(string); !found { + if component, found := jobState.Params[job.DeployJobParam_Component].(string); !found { return nil, fmt.Errorf("deployJob: missing component (ceramic, ipfs, cas)") - } else if sha, found := jobState.Params[job.JobParam_Sha].(string); !found { + } else if sha, found := jobState.Params[job.DeployJobParam_Sha].(string); !found { return nil, fmt.Errorf("deployJob: missing sha") } else { - manual, _ := jobState.Params[job.JobParam_Manual].(bool) - rollback, _ := jobState.Params[job.JobParam_Rollback].(bool) + manual, _ := jobState.Params[job.DeployJobParam_Manual].(bool) + rollback, _ := jobState.Params[job.DeployJobParam_Rollback].(bool) return &deployJob{baseJob{jobState, db, notifs}, manager.DeployComponent(component), sha, manual, rollback, d, repo}, nil } } @@ -51,10 +51,10 @@ func (d deployJob) Advance() (job.JobState, error) { } else if envLayout, err := d.d.GenerateEnvLayout(d.component); err != nil { return d.advance(job.JobStage_Failed, now, err) } else { - d.state.Params[job.JobParam_Layout] = *envLayout - // Don't update the timestamp here so that the "dequeued" event remains at the same position on the - // timeline as the "queued" event. - return d.advance(job.JobStage_Dequeued, d.state.Ts, nil) + d.state.Params[job.DeployJobParam_Layout] = *envLayout + // Advance the timestamp by a tiny amount so that the "dequeued" event remains at the same position on + // the timeline as the "queued" event but still ahead of it. + return d.advance(job.JobStage_Dequeued, d.state.Ts.Add(time.Nanosecond), nil) } } case job.JobStage_Dequeued: @@ -62,7 +62,7 @@ func (d deployJob) Advance() (job.JobState, error) { if err := d.updateEnv(d.sha); err != nil { return d.advance(job.JobStage_Failed, now, err) } else { - d.state.Params[job.JobParam_Start] = time.Now().UnixNano() + d.state.Params[job.JobParam_Start] = float64(time.Now().UnixNano()) // For started deployments update the build commit hash in the DB. if err = d.db.UpdateBuildHash(d.component, d.sha); err != nil { // This isn't an error big enough to fail the job, just report and move on. @@ -111,7 +111,7 @@ func (d deployJob) prepareJob(deployHashes map[manager.DeployComponent]string) e // // The last two cases will only happen when redeploying manually, so we can note that in the notification. if d.sha == manager.BuildHashLatest { - shaTag, _ := d.state.Params[job.JobParam_ShaTag].(string) + shaTag, _ := d.state.Params[job.DeployJobParam_ShaTag].(string) if latestSha, err := d.repo.GetLatestCommitHash( manager.ComponentRepo(d.component), manager.EnvBranch(d.component, manager.EnvType(os.Getenv("ENV"))), @@ -132,22 +132,22 @@ func (d deployJob) prepareJob(deployHashes map[manager.DeployComponent]string) e } d.manual = true } - d.state.Params[job.JobParam_Sha] = d.sha + d.state.Params[job.DeployJobParam_Sha] = d.sha if d.manual { - d.state.Params[job.JobParam_Manual] = true + d.state.Params[job.DeployJobParam_Manual] = true } return nil } func (d deployJob) updateEnv(commitHash string) error { - if layout, found := d.state.Params[job.JobParam_Layout].(manager.Layout); found { + if layout, found := d.state.Params[job.DeployJobParam_Layout].(manager.Layout); found { return d.d.UpdateEnv(&layout, commitHash) } return fmt.Errorf("updateEnv: missing env layout") } func (d deployJob) checkEnv() (bool, error) { - if layout, found := d.state.Params[job.JobParam_Layout].(manager.Layout); !found { + if layout, found := d.state.Params[job.DeployJobParam_Layout].(manager.Layout); !found { return false, fmt.Errorf("checkEnv: missing env layout") } else if deployed, err := d.d.CheckEnv(&layout); err != nil { return false, err diff --git a/cd/manager/jobs/e2e.go b/cd/manager/jobs/e2e.go index 03eca02..894fbb5 100644 --- a/cd/manager/jobs/e2e.go +++ b/cd/manager/jobs/e2e.go @@ -30,16 +30,16 @@ func (e e2eTestJob) Advance() (job.JobState, error) { { // No preparation needed so advance the job directly to "dequeued". // - // Don't update the timestamp here so that the "dequeued" event remains at the same position on the timeline - // as the "queued" event. - return e.advance(job.JobStage_Dequeued, e.state.Ts, nil) + // Advance the timestamp by a tiny amount so that the "dequeued" event remains at the same position on the + // timeline as the "queued" event but still ahead of it. + return e.advance(job.JobStage_Dequeued, e.state.Ts.Add(time.Nanosecond), nil) } case job.JobStage_Dequeued: { if err := e.startAllTests(); err != nil { return e.advance(job.JobStage_Failed, now, err) } else { - e.state.Params[job.JobParam_Start] = time.Now().UnixNano() + e.state.Params[job.JobParam_Start] = float64(time.Now().UnixNano()) return e.advance(job.JobStage_Started, now, nil) } } diff --git a/cd/manager/jobs/smoke.go b/cd/manager/jobs/smoke.go index 9441897..aff8c5d 100644 --- a/cd/manager/jobs/smoke.go +++ b/cd/manager/jobs/smoke.go @@ -36,9 +36,9 @@ func (s smokeTestJob) Advance() (job.JobState, error) { { // No preparation needed so advance the job directly to "dequeued". // - // Don't update the timestamp here so that the "dequeued" event remains at the same position on the timeline - // as the "queued" event. - return s.advance(job.JobStage_Dequeued, s.state.Ts, nil) + // Advance the timestamp by a tiny amount so that the "dequeued" event remains at the same position on the + // timeline as the "queued" event but still ahead of it. + return s.advance(job.JobStage_Dequeued, s.state.Ts.Add(time.Nanosecond), nil) } case job.JobStage_Dequeued: { @@ -47,7 +47,7 @@ func (s smokeTestJob) Advance() (job.JobState, error) { } else { // Update the job stage and spawned task identifier s.state.Params[job.JobParam_Id] = id - s.state.Params[job.JobParam_Start] = time.Now().UnixNano() + s.state.Params[job.JobParam_Start] = float64(time.Now().UnixNano()) return s.advance(job.JobStage_Started, now, nil) } } diff --git a/cd/manager/models.go b/cd/manager/models.go index 1949f8f..b1a600c 100644 --- a/cd/manager/models.go +++ b/cd/manager/models.go @@ -204,13 +204,12 @@ type Deployment interface { // Notifs represents a notification service (e.g. Discord) type Notifs interface { NotifyJob(...job.JobState) - SendAlert(string, string) } // Manager represents the job manager, which is the central job orchestrator of this service. type Manager interface { - NewJob(job.JobState) (string, error) - CheckJob(string) string + NewJob(job.JobState) (job.JobState, error) + CheckJob(string) job.JobState ProcessJobs(chan bool) Pause() } diff --git a/cd/manager/notifs/anchor.go b/cd/manager/notifs/anchor.go new file mode 100644 index 0000000..853e689 --- /dev/null +++ b/cd/manager/notifs/anchor.go @@ -0,0 +1,73 @@ +package notifs + +import ( + "fmt" + "strings" + + "github.com/disgoorg/disgo/discord" + "github.com/disgoorg/disgo/webhook" + + "github.com/3box/pipeline-tools/cd/manager/common/job" +) + +var _ jobNotif = &anchorNotif{} + +type anchorNotif struct { + state job.JobState + alertWebhook webhook.Client + warningWebhook webhook.Client +} + +func newAnchorNotif(jobState job.JobState) (jobNotif, error) { + if a, err := parseDiscordWebhookUrl("DISCORD_ALERT_WEBHOOK"); err != nil { + return nil, err + } else if w, err := parseDiscordWebhookUrl("DISCORD_WARNING_WEBHOOK"); err != nil { + return nil, err + } else { + return &anchorNotif{jobState, a, w}, nil + } +} + +func (a anchorNotif) getChannels() []webhook.Client { + // We only care about "waiting" notifications from the CD manager for the time being. Other notifications are sent + // directly from the anchor worker. + if a.state.Stage == job.JobStage_Waiting { + webhooks := make([]webhook.Client, 0, 1) + if stalled, _ := a.state.Params[job.AnchorJobParam_Stalled].(bool); stalled { + webhooks = append(webhooks, a.alertWebhook) + } else if delayed, _ := a.state.Params[job.AnchorJobParam_Delayed].(bool); delayed { + webhooks = append(webhooks, a.warningWebhook) + } + } + return nil +} + +func (a anchorNotif) getTitle() string { + jobStageRepr := string(a.state.Stage) + // If "waiting", update the job stage representation to qualify the severity of the delay, if applicable. + if a.state.Stage == job.JobStage_Waiting { + if stalled, _ := a.state.Params[job.AnchorJobParam_Stalled].(bool); stalled { + jobStageRepr = job.AnchorJobParam_Stalled + } else if delayed, _ := a.state.Params[job.AnchorJobParam_Delayed].(bool); delayed { + jobStageRepr = job.AnchorJobParam_Delayed + } + } + return fmt.Sprintf("Anchor Worker %s", strings.ToUpper(jobStageRepr)) +} + +func (a anchorNotif) getFields() []discord.EmbedField { + return nil +} + +func (a anchorNotif) getColor() discordColor { + if a.state.Stage == job.JobStage_Waiting { + if stalled, _ := a.state.Params[job.AnchorJobParam_Stalled].(bool); stalled { + return discordColor_Alert + } else if delayed, _ := a.state.Params[job.AnchorJobParam_Delayed].(bool); delayed { + return discordColor_Warning + } else { + return discordColor_Info + } + } + return getColorForStage(a.state.Stage) +} diff --git a/cd/manager/notifs/deploy.go b/cd/manager/notifs/deploy.go new file mode 100644 index 0000000..a00f6d9 --- /dev/null +++ b/cd/manager/notifs/deploy.go @@ -0,0 +1,73 @@ +package notifs + +import ( + "fmt" + "os" + "strings" + + "golang.org/x/text/cases" + "golang.org/x/text/language" + + "github.com/disgoorg/disgo/discord" + "github.com/disgoorg/disgo/webhook" + + "github.com/3box/pipeline-tools/cd/manager" + "github.com/3box/pipeline-tools/cd/manager/common/job" +) + +var _ jobNotif = &deployNotif{} + +type deployNotif struct { + state job.JobState + deploymentsWebhook webhook.Client + communityWebhook webhook.Client + env manager.EnvType +} + +func newDeployNotif(jobState job.JobState) (jobNotif, error) { + if d, err := parseDiscordWebhookUrl("DISCORD_DEPLOYMENTS_WEBHOOK"); err != nil { + return nil, err + } else if c, err := parseDiscordWebhookUrl("DISCORD_COMMUNITY_NODES_WEBHOOK"); err != nil { + return nil, err + } else { + return &deployNotif{jobState, d, c, manager.EnvType(os.Getenv("ENV"))}, nil + } +} + +func (d deployNotif) getChannels() []webhook.Client { + webhooks := []webhook.Client{d.deploymentsWebhook} + // Don't send Dev/QA notifications to the community channel + if (d.env != manager.EnvType_Dev) && (d.env != manager.EnvType_Qa) { + webhooks = append(webhooks, d.communityWebhook) + } + return webhooks +} + +func (d deployNotif) getTitle() string { + component := d.state.Params[job.DeployJobParam_Component].(string) + qualifier := "" + // A rollback is always a force job, while a non-rollback force job is always manual, so we can optimize. + if rollback, _ := d.state.Params[job.DeployJobParam_Rollback].(bool); rollback { + qualifier = job.DeployJobParam_Rollback + } else if force, _ := d.state.Params[job.DeployJobParam_Force].(bool); force { + qualifier = job.DeployJobParam_Force + } else if manual, _ := d.state.Params[job.DeployJobParam_Manual].(bool); manual { + qualifier = job.DeployJobParam_Manual + } + return fmt.Sprintf( + "3Box Labs `%s` %s %s %s %s", + manager.EnvName(d.env), + strings.ToUpper(component), + cases.Title(language.English).String(qualifier), + "Deployment", + strings.ToUpper(string(d.state.Stage)), + ) +} + +func (d deployNotif) getFields() []discord.EmbedField { + return nil +} + +func (d deployNotif) getColor() discordColor { + return getColorForStage(d.state.Stage) +} diff --git a/cd/manager/notifs/discord.go b/cd/manager/notifs/discord.go index 9602b71..6ad0097 100644 --- a/cd/manager/notifs/discord.go +++ b/cd/manager/notifs/discord.go @@ -9,9 +9,6 @@ import ( "strings" "time" - "golang.org/x/text/cases" - "golang.org/x/text/language" - "github.com/disgoorg/disgo/discord" "github.com/disgoorg/disgo/rest" "github.com/disgoorg/disgo/webhook" @@ -21,14 +18,14 @@ import ( "github.com/3box/pipeline-tools/cd/manager/common/job" ) -type DiscordColor int +type discordColor int const ( - DiscordColor_None = iota - DiscordColor_Info = 3447003 - DiscordColor_Ok = 3581519 - DiscordColor_Warning = 16776960 - DiscordColor_Alert = 16711712 + discordColor_None = iota + discordColor_Info = 3447003 + discordColor_Ok = 3581519 + discordColor_Warning = 16776960 + discordColor_Alert = 16711712 ) const DiscordPacing = 2 * time.Second @@ -38,29 +35,23 @@ const ShaTagLength = 12 var _ manager.Notifs = &JobNotifs{} type JobNotifs struct { - db manager.Database - cache manager.Cache - alertWebhook webhook.Client - warningWebhook webhook.Client - deploymentsWebhook webhook.Client - communityWebhook webhook.Client - testWebhook webhook.Client - env manager.EnvType + db manager.Database + cache manager.Cache + testWebhook webhook.Client +} + +type jobNotif interface { + getChannels() []webhook.Client + getTitle() string + getFields() []discord.EmbedField + getColor() discordColor } func NewJobNotifs(db manager.Database, cache manager.Cache) (manager.Notifs, error) { - if a, err := parseDiscordWebhookUrl("DISCORD_ALERT_WEBHOOK"); err != nil { - return nil, err - } else if w, err := parseDiscordWebhookUrl("DISCORD_WARNING_WEBHOOK"); err != nil { - return nil, err - } else if d, err := parseDiscordWebhookUrl("DISCORD_DEPLOYMENTS_WEBHOOK"); err != nil { - return nil, err - } else if c, err := parseDiscordWebhookUrl("DISCORD_COMMUNITY_NODES_WEBHOOK"); err != nil { - return nil, err - } else if t, err := parseDiscordWebhookUrl("DISCORD_TEST_WEBHOOK"); err != nil { + if t, err := parseDiscordWebhookUrl("DISCORD_TEST_WEBHOOK"); err != nil { return nil, err } else { - return &JobNotifs{db, cache, a, w, d, c, t, manager.EnvType(os.Getenv("ENV"))}, nil + return &JobNotifs{db, cache, t}, nil } } @@ -83,39 +74,41 @@ func parseDiscordWebhookUrl(urlEnv string) (webhook.Client, error) { func (n JobNotifs) NotifyJob(jobs ...job.JobState) { for _, jobState := range jobs { - for _, channel := range n.getNotifChannels(jobState) { - if channel != nil { - n.sendNotif( - n.getNotifTitle(jobState), - n.getNotifFields(jobState), - n.getNotifColor(jobState), - channel, - ) + if jn, err := n.getJobNotif(jobState); err != nil { + log.Printf("notifyJob: error creating job notification: %v, %s", err, manager.PrintJob(jobState)) + } else { + // Send all notifications to the test webhook + channels := append(jn.getChannels(), n.testWebhook) + for _, channel := range channels { + if channel != nil { + n.sendNotif( + jn.getTitle(), + append(n.getNotifFields(jobState), jn.getFields()...), + jn.getColor(), + channel, + ) + } } } } } -func (n JobNotifs) SendAlert(title, desc string) { - if n.alertWebhook != nil { - messageEmbed := discord.Embed{ - Title: title, - Description: desc, - Type: discord.EmbedTypeRich, - Color: DiscordColor_Alert, - } - if _, err := n.alertWebhook.CreateMessage(discord.NewWebhookMessageCreateBuilder(). - SetEmbeds(messageEmbed). - SetUsername(manager.ServiceName). - Build(), - rest.WithDelay(DiscordPacing), - ); err != nil { - log.Printf("sendAlert: error sending discord notification: %v, %s", err, desc) - } +func (n JobNotifs) getJobNotif(jobState job.JobState) (jobNotif, error) { + switch jobState.Type { + case job.JobType_Deploy: + return newDeployNotif(jobState) + case job.JobType_Anchor: + return newAnchorNotif(jobState) + case job.JobType_TestE2E: + return newE2eTestNotif(jobState) + case job.JobType_TestSmoke: + return newSmokeTestNotif(jobState) + default: + return nil, fmt.Errorf("getJobNotif: unknown job type: %s", jobState.Type) } } -func (n JobNotifs) sendNotif(title string, fields []discord.EmbedField, color DiscordColor, channel webhook.Client) { +func (n JobNotifs) sendNotif(title string, fields []discord.EmbedField, color discordColor, channel webhook.Client) { messageEmbed := discord.Embed{ Title: title, Type: discord.EmbedTypeRich, @@ -132,72 +125,6 @@ func (n JobNotifs) sendNotif(title string, fields []discord.EmbedField, color Di } } -func (n JobNotifs) getNotifChannels(jobState job.JobState) []webhook.Client { - webhooks := make([]webhook.Client, 0, 1) - switch jobState.Type { - case job.JobType_Deploy: - { - webhooks = append(webhooks, n.deploymentsWebhook) - // Don't send Dev/QA notifications to the community channel. - if (n.env != manager.EnvType_Dev) && (n.env != manager.EnvType_Qa) { - webhooks = append(webhooks, n.communityWebhook) - } - } - case job.JobType_Anchor: - { - // We only care about "waiting" notifications from the CD manager for the time being. Other notifications - // are sent directly from the anchor worker. - if jobState.Stage == job.JobStage_Waiting { - if stalled, _ := jobState.Params[job.JobParam_Stalled].(bool); stalled { - webhooks = append(webhooks, n.alertWebhook) - } else if delayed, _ := jobState.Params[job.JobParam_Delayed].(bool); delayed { - webhooks = append(webhooks, n.warningWebhook) - } - } - } - } - // Send all notifications to the test webhook - webhooks = append(webhooks, n.testWebhook) - return webhooks -} - -func (n JobNotifs) getNotifTitle(jobState job.JobState) string { - notifTitlePfx := job.JobName(jobState.Type) - jobStageRepr := string(jobState.Stage) - switch jobState.Type { - case job.JobType_Deploy: - { - component := jobState.Params[job.JobParam_Component].(string) - qualifier := "" - // A rollback is always a force job, while a non-rollback force job is always manual, so we can optimize. - if rollback, _ := jobState.Params[job.JobParam_Rollback].(bool); rollback { - qualifier = job.JobParam_Rollback - } else if force, _ := jobState.Params[job.JobParam_Force].(bool); force { - qualifier = job.JobParam_Force - } else if manual, _ := jobState.Params[job.JobParam_Manual].(bool); manual { - qualifier = job.JobParam_Manual - } - notifTitlePfx = fmt.Sprintf( - "3Box Labs `%s` %s %s %s", - manager.EnvName(n.env), - strings.ToUpper(component), - cases.Title(language.English).String(qualifier), - notifTitlePfx, - ) - } - case job.JobType_Anchor: - // If "waiting", update the job stage representation to qualify the severity of the delay, if applicable. - if jobState.Stage == job.JobStage_Waiting { - if stalled, _ := jobState.Params[job.JobParam_Stalled].(bool); stalled { - jobStageRepr = job.JobParam_Stalled - } else if delayed, _ := jobState.Params[job.JobParam_Delayed].(bool); delayed { - jobStageRepr = job.JobParam_Delayed - } - } - } - return fmt.Sprintf("%s %s", notifTitlePfx, strings.ToUpper(jobStageRepr)) -} - func (n JobNotifs) getNotifFields(jobState job.JobState) []discord.EmbedField { fields := []discord.EmbedField{ { @@ -223,43 +150,15 @@ func (n JobNotifs) getNotifFields(jobState job.JobState) []discord.EmbedField { return fields } -func (n JobNotifs) getNotifColor(jobState job.JobState) DiscordColor { - switch jobState.Stage { - case job.JobStage_Dequeued: - return DiscordColor_Info - case job.JobStage_Skipped: - return DiscordColor_Warning - case job.JobStage_Started: - return DiscordColor_None - case job.JobStage_Waiting: - if stalled, _ := jobState.Params[job.JobParam_Stalled].(bool); stalled { - return DiscordColor_Alert - } else if delayed, _ := jobState.Params[job.JobParam_Delayed].(bool); delayed { - return DiscordColor_Warning - } else { - return DiscordColor_Info - } - case job.JobStage_Failed: - return DiscordColor_Alert - case job.JobStage_Canceled: - return DiscordColor_Warning - case job.JobStage_Completed: - return DiscordColor_Ok - default: - log.Printf("sendNotif: unknown job stage: %s", manager.PrintJob(jobState)) - return DiscordColor_Alert - } -} - func (n JobNotifs) getDeployHashes(jobState job.JobState) string { if commitHashes, err := n.db.GetDeployHashes(); err != nil { return "" } else { if jobState.Type == job.JobType_Deploy { - sha := jobState.Params[job.JobParam_Sha].(string) + sha := jobState.Params[job.DeployJobParam_Sha].(string) // If the specified hash is valid, overwrite the previous hash from the database. if isValidSha, _ := regexp.MatchString(manager.CommitHashRegex, sha); isValidSha { - commitHashes[manager.DeployComponent(jobState.Params[job.JobParam_Component].(string))] = sha + commitHashes[manager.DeployComponent(jobState.Params[job.DeployJobParam_Component].(string))] = sha } } // Prepare component messages with GitHub commit hashes and hyperlinks @@ -325,3 +224,25 @@ func (n JobNotifs) getActiveJobsByType(jobState job.JobState, jobType job.JobTyp Value: message, }, len(message) > 0 } + +func getColorForStage(jobStage job.JobStage) discordColor { + switch jobStage { + case job.JobStage_Dequeued: + return discordColor_Info + case job.JobStage_Skipped: + return discordColor_Warning + case job.JobStage_Started: + return discordColor_None + case job.JobStage_Waiting: + return discordColor_Info + case job.JobStage_Failed: + return discordColor_Alert + case job.JobStage_Canceled: + return discordColor_Warning + case job.JobStage_Completed: + return discordColor_Ok + default: + log.Printf("getColorForStage: unknown job stage: %s", jobStage) + return discordColor_Alert + } +} diff --git a/cd/manager/notifs/e2e.go b/cd/manager/notifs/e2e.go new file mode 100644 index 0000000..6b8ad51 --- /dev/null +++ b/cd/manager/notifs/e2e.go @@ -0,0 +1,37 @@ +package notifs + +import ( + "fmt" + "strings" + + "github.com/disgoorg/disgo/discord" + "github.com/disgoorg/disgo/webhook" + + "github.com/3box/pipeline-tools/cd/manager/common/job" +) + +var _ jobNotif = &e2eTestNotif{} + +type e2eTestNotif struct { + state job.JobState +} + +func newE2eTestNotif(jobState job.JobState) (jobNotif, error) { + return &e2eTestNotif{jobState}, nil +} + +func (e e2eTestNotif) getChannels() []webhook.Client { + return nil +} + +func (e e2eTestNotif) getTitle() string { + return fmt.Sprintf("E2E Tests %s", strings.ToUpper(string(e.state.Stage))) +} + +func (e e2eTestNotif) getFields() []discord.EmbedField { + return nil +} + +func (e e2eTestNotif) getColor() discordColor { + return getColorForStage(e.state.Stage) +} diff --git a/cd/manager/notifs/smoke.go b/cd/manager/notifs/smoke.go new file mode 100644 index 0000000..49ccbdf --- /dev/null +++ b/cd/manager/notifs/smoke.go @@ -0,0 +1,37 @@ +package notifs + +import ( + "fmt" + "strings" + + "github.com/disgoorg/disgo/discord" + "github.com/disgoorg/disgo/webhook" + + "github.com/3box/pipeline-tools/cd/manager/common/job" +) + +var _ jobNotif = &smokeTestNotif{} + +type smokeTestNotif struct { + state job.JobState +} + +func newSmokeTestNotif(jobState job.JobState) (jobNotif, error) { + return &smokeTestNotif{jobState}, nil +} + +func (s smokeTestNotif) getChannels() []webhook.Client { + return nil +} + +func (s smokeTestNotif) getTitle() string { + return fmt.Sprintf("Smoke Tests %s", strings.ToUpper(string(s.state.Stage))) +} + +func (s smokeTestNotif) getFields() []discord.EmbedField { + return nil +} + +func (s smokeTestNotif) getColor() discordColor { + return getColorForStage(s.state.Stage) +} diff --git a/cd/manager/repository/github.go b/cd/manager/repository/github.go index 7e8495e..009d755 100644 --- a/cd/manager/repository/github.go +++ b/cd/manager/repository/github.go @@ -8,9 +8,10 @@ import ( "strings" "time" - "github.com/google/go-github/github" "golang.org/x/oauth2" + "github.com/google/go-github/v56/github" + "github.com/3box/pipeline-tools/cd/manager" ) @@ -45,7 +46,7 @@ func (g Github) GetLatestCommitHash(repo manager.DeployRepo, branch, shaTag stri }); err != nil { return "", err } else { - log.Printf("getLatestCommitHash: list commits rate limit=%d, remaining=%d, resetAt=%s", resp.Limit, resp.Remaining, resp.Reset) + log.Printf("getLatestCommitHash: list commits rate limit=%d, remaining=%d, resetAt=%s", resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset) for _, commit := range commits { sha := *commit.SHA if checksPassed, err := g.checkRefStatus(repo, sha); err != nil { @@ -68,7 +69,7 @@ func (g Github) checkRefStatus(repo manager.DeployRepo, ref string) (bool, error defer cancel() status, resp, err := g.client.Repositories.GetCombinedStatus(ctx, manager.GitHubOrg, string(repo), ref, &github.ListOptions{PerPage: 100}) - log.Printf("checkRefStatus: status=%s, rate limit=%d, remaining=%d, resetAt=%s", status, resp.Limit, resp.Remaining, resp.Reset) + log.Printf("checkRefStatus: status=%s, rate limit=%d, remaining=%d, resetAt=%s", status, resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset) return status, err } // Wait a few minutes for the status to finalize if it is currently "pending" diff --git a/cd/manager/server/server.go b/cd/manager/server/server.go index 3d70471..05b6e1c 100644 --- a/cd/manager/server/server.go +++ b/cd/manager/server/server.go @@ -50,42 +50,42 @@ func timeHandler(format string) http.HandlerFunc { func jobHandler(m manager.Manager) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { status := http.StatusOK - message := "Success" + var body any decoder := json.NewDecoder(r.Body) - // Allow unknown fields so that we ignore unneeded params sent by calling services - //decoder.DisallowUnknownFields() + decoder.DisallowUnknownFields() jobState := job.JobState{} if r.Header.Get("Content-Type") != "application/json" { status = http.StatusUnsupportedMediaType - message = "content-type is not application/json" + body = "content-type is not application/json" } else if err := decoder.Decode(&jobState); err != nil { status = http.StatusBadRequest var unmarshalErr *json.UnmarshalTypeError if errors.As(err, &unmarshalErr) { - message = "wrong type for field " + unmarshalErr.Field + body = "wrong type for field: " + unmarshalErr.Field } else { - message = "bad request: " + err.Error() + body = "bad request: " + err.Error() } } else if r.Method == http.MethodPost { - if jobId, err := m.NewJob(jobState); err != nil { - status = http.StatusBadRequest - message = "could not queue job: " + err.Error() + if jobState, err = m.NewJob(jobState); err != nil { + status = http.StatusInternalServerError + body = "could not queue job: " + err.Error() } else { - message = jobId + body = jobState } } else if r.Method == http.MethodGet { - message = m.CheckJob(jobState.Job) + body = m.CheckJob(jobState.Job) + } else { + body = "unsupported method: " + r.Method + status = http.StatusMethodNotAllowed } - writeJsonResponse(w, message, status) + writeJsonResponse(w, body, status) } } -func writeJsonResponse(w http.ResponseWriter, message string, httpStatusCode int) { +func writeJsonResponse(w http.ResponseWriter, body any, httpStatusCode int) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(httpStatusCode) - resp := make(map[string]string) - resp["message"] = message - jsonResp, _ := json.Marshal(resp) + jsonResp, _ := json.Marshal(body) w.Write(jsonResp) } diff --git a/cd/manager/utils.go b/cd/manager/utils.go index d243e3a..9a6cbf8 100644 --- a/cd/manager/utils.go +++ b/cd/manager/utils.go @@ -103,7 +103,7 @@ func IsValidSha(sha string) bool { func IsV5WorkerJob(jobState job.JobState) bool { if jobState.Type == job.JobType_Anchor { - if version, found := jobState.Params[job.JobParam_Version].(string); found && (version == CasV5Version) { + if version, found := jobState.Params[job.AnchorJobParam_Version].(string); found && (version == CasV5Version) { return true } }