From 427853781f8d622f34a1fb61206f3eecba9bfb10 Mon Sep 17 00:00:00 2001 From: Mohsin Zaidi <2236875+smrz2001@users.noreply.github.com> Date: Mon, 30 Oct 2023 16:26:44 -0400 Subject: [PATCH] feat(cd): add github workflow job type --- cd/manager/common/job/models.go | 12 ++ cd/manager/jobmanager/jobManager.go | 61 +++++-- cd/manager/jobs/workflow.go | 236 ++++++++++++++++++++++++++++ cd/manager/models.go | 9 ++ cd/manager/notifs/discord.go | 5 + cd/manager/notifs/workflow.go | 65 ++++++++ cd/manager/utils.go | 2 + ci/scripts/schedule_job.sh | 9 ++ 8 files changed, 390 insertions(+), 9 deletions(-) create mode 100644 cd/manager/jobs/workflow.go create mode 100644 cd/manager/notifs/workflow.go create mode 100755 ci/scripts/schedule_job.sh diff --git a/cd/manager/common/job/models.go b/cd/manager/common/job/models.go index ff98a2e..f52860e 100644 --- a/cd/manager/common/job/models.go +++ b/cd/manager/common/job/models.go @@ -13,6 +13,7 @@ const ( JobType_Anchor JobType = "anchor" JobType_TestE2E JobType = "test_e2e" JobType_TestSmoke JobType = "test_smoke" + JobType_Workflow JobType = "workflow" ) type JobStage string @@ -52,6 +53,17 @@ const ( AnchorJobParam_Overrides string = "overrides" ) +const ( + WorkflowJobParam_Name string = "name" + WorkflowJobParam_Org string = "org" + WorkflowJobParam_Repo string = "repo" + WorkflowJobParam_Ref string = "ref" + WorkflowJobParam_Workflow string = "workflow" + WorkflowJobParam_Inputs string = "inputs" + WorkflowJobParam_Environment string = "environment" + WorkflowJobParam_Url string = "url" +) + // JobState represents the state of a job in the database type JobState struct { Job string `dynamodbav:"job"` // Job ID, same for all stages of an individual Job diff --git a/cd/manager/jobmanager/jobManager.go b/cd/manager/jobmanager/jobManager.go index 2a8355d..11a8d8d 100644 --- a/cd/manager/jobmanager/jobManager.go +++ b/cd/manager/jobmanager/jobManager.go @@ -142,9 +142,10 @@ func (m *JobManager) processJobs() { if len(dequeuedJobs) > 0 { // Try to start multiple jobs and collapse similar ones: // - one deploy at a time - // - any number of anchor workers (compatible with with smoke/E2E tests) - // - one smoke test at a time (compatible with anchor workers, E2E tests) - // - one E2E test at a time (compatible with anchor workers, smoke tests) + // - any number of anchor workers (compatible with non-deploy jobs) + // - one smoke test at a time (compatible with non-deploy jobs) + // - one E2E test at a time (compatible with non-deploy jobs) + // - one workflow at a time (compatible with non-deploy jobs) // // Loop over compatible dequeued jobs until we find an incompatible one and need to wait for existing jobs to // complete. @@ -177,6 +178,7 @@ func (m *JobManager) processJobs() { processAnchorJobs = len(e2eTestJobs) > 0 } else { m.processTestJobs(dequeuedJobs) + m.processWorkflowJobs(dequeuedJobs) } } // If no deploy jobs were launched, process pending anchor jobs. We don't want to hold on to anchor jobs queued @@ -194,7 +196,6 @@ func (m *JobManager) processJobs() { func (m *JobManager) advanceJobs(jobs []job.JobState) { if len(jobs) > 0 { - log.Printf("advanceJobs: advancing %d jobs...", len(jobs)) for _, jobState := range jobs { m.advanceJob(jobState) } @@ -412,6 +413,39 @@ func (m *JobManager) processTestJobs(dequeuedJobs []job.JobState) bool { return false } +func (m *JobManager) processWorkflowJobs(dequeuedJobs []job.JobState) bool { + // Check if there are any deploy jobs in progress + activeDeploys := m.cache.JobsByMatcher(func(js job.JobState) bool { + return job.IsActiveJob(js) && (js.Type == job.JobType_Deploy) + }) + if len(activeDeploys) == 0 { + dequeuedWorkflow := dequeuedJobs[0] + // Collapse similar, back-to-back workflows into a single run and kick it off. + for i := 1; i < len(dequeuedJobs); i++ { + dequeuedJob := dequeuedJobs[i] + // Break out of the loop as soon as we find a deploy job. We don't want to collapse workflow jobs across + // deploy jobs. + if dequeuedJob.Type == job.JobType_Deploy { + break + } else if dequeuedJob.Type == job.JobType_Workflow { + // Skip the current workflow job, and replace it with a newer one. + if err := m.updateJobStage(dequeuedWorkflow, job.JobStage_Skipped, nil); err != nil { + // Return `true` from here so that no state is changed and the loop can restart cleanly. Any jobs + // already skipped won't be picked up again, which is ok. + return true + } + dequeuedWorkflow = dequeuedJob + } + } + log.Printf("processWorkflowJobs: starting workflow job: %s", manager.PrintJob(dequeuedWorkflow)) + m.advanceJob(dequeuedWorkflow) + return true + } else { + log.Printf("processWorkflowJobs: deployment in progress") + } + return false +} + func (m *JobManager) advanceJob(jobState job.JobState) { m.waitGroup.Add(1) go func() { @@ -451,18 +485,25 @@ func (m *JobManager) postProcessJob(jobState job.JobState) { case job.JobType_Deploy: { switch jobState.Stage { - // For completed deployments, also add a smoke test job 5 minutes in the future to allow the deployment to - // stabilize. + // For completed deployments, also add a test workflow job 5 minutes in the future to allow the deployment + // to stabilize. case job.JobStage_Completed: { if _, err := m.NewJob(job.JobState{ Ts: time.Now().Add(manager.DefaultWaitTime), - Type: job.JobType_TestSmoke, + Type: job.JobType_Workflow, Params: map[string]interface{}{ - job.JobParam_Source: manager.ServiceName, + job.JobParam_Source: manager.ServiceName, + job.WorkflowJobParam_Org: manager.Tests_Org, + job.WorkflowJobParam_Repo: manager.Tests_Repo, + job.WorkflowJobParam_Ref: manager.Tests_Ref, + job.WorkflowJobParam_Workflow: manager.Tests_Workflow, + job.WorkflowJobParam_Inputs: map[string]interface{}{ + job.WorkflowJobParam_Environment: m.env, + }, }, }); err != nil { - log.Printf("postProcessJob: failed to queue smoke tests after deploy: %v, %s", err, manager.PrintJob(jobState)) + log.Printf("postProcessJob: failed to queue test workflow after deploy: %v, %s", err, manager.PrintJob(jobState)) } } // For failed deployments, rollback to the previously deployed commit hash. @@ -503,6 +544,8 @@ func (m *JobManager) prepareJobSm(jobState job.JobState) (manager.JobSm, error) jobSm = jobs.E2eTestJob(jobState, m.db, m.notifs, m.d) case job.JobType_TestSmoke: jobSm = jobs.SmokeTestJob(jobState, m.db, m.notifs, m.d) + case job.JobType_Workflow: + jobSm, err = jobs.GitHubWorkflowJob(jobState, m.db, m.notifs) default: err = fmt.Errorf("prepareJobSm: unknown job type: %s", manager.PrintJob(jobState)) } diff --git a/cd/manager/jobs/workflow.go b/cd/manager/jobs/workflow.go new file mode 100644 index 0000000..39525f9 --- /dev/null +++ b/cd/manager/jobs/workflow.go @@ -0,0 +1,236 @@ +package jobs + +import ( + "context" + "fmt" + "log" + "net/http" + "os" + "time" + + "golang.org/x/oauth2" + + "github.com/google/go-github/v56/github" + + "github.com/3box/pipeline-tools/cd/manager" + "github.com/3box/pipeline-tools/cd/manager/common/job" +) + +// Allow up to 4 hours for a workflow to run +const workflowFailureTime = 4 * time.Hour + +// GitHub constants +const ( + gitHub_WorkflowEventType = "workflow_dispatch" + gitHub_WorkflowTimeFormat = "2006-01-02T15:04:05.000Z" // ISO8601 + gitHub_WorkflowJobId = "job_id" +) +const ( + gitHub_WorkflowStatus_Success = "success" + gitHub_WorkflowStatus_Failure = "failure" +) + +var _ manager.JobSm = &githubWorkflowJob{} + +type githubWorkflowJob struct { + baseJob + client *github.Client + org string + repo string + ref string + workflow string + inputs map[string]interface{} + env string +} + +func GitHubWorkflowJob(jobState job.JobState, db manager.Database, notifs manager.Notifs) (manager.JobSm, error) { + if org, found := jobState.Params[job.WorkflowJobParam_Org].(string); !found { + return nil, fmt.Errorf("githubWorkflowJob: missing org") + } else if repo, found := jobState.Params[job.WorkflowJobParam_Repo].(string); !found { + return nil, fmt.Errorf("githubWorkflowJob: missing repo") + } else if ref, found := jobState.Params[job.WorkflowJobParam_Ref].(string); !found { + return nil, fmt.Errorf("githubWorkflowJob: missing ref") + } else if workflow, found := jobState.Params[job.WorkflowJobParam_Workflow].(string); !found { + return nil, fmt.Errorf("githubWorkflowJob: missing workflow") + } else { + inputs, _ := jobState.Params[job.WorkflowJobParam_Inputs].(map[string]interface{}) + if len(inputs) == 0 { + inputs = make(map[string]interface{}, 1) + } + // Add the job ID to the inputs, so we can track the right workflow corresponding to this job. + inputs[gitHub_WorkflowJobId] = jobState.Job + // Set the environment - it's ok to override it even if it was already set. + env := os.Getenv("ENV") + jobState.Params[job.WorkflowJobParam_Environment] = env + + var httpClient *http.Client = nil + if accessToken, found := os.LookupEnv("GITHUB_ACCESS_TOKEN"); found { + ts := oauth2.StaticTokenSource( + &oauth2.Token{AccessToken: accessToken}, + ) + httpClient = oauth2.NewClient(context.Background(), ts) + } + + return &githubWorkflowJob{ + baseJob{jobState, db, notifs}, + github.NewClient(httpClient), + org, + repo, + ref, + workflow, + inputs, + env, + }, nil + } +} + +func (w githubWorkflowJob) Advance() (job.JobState, error) { + now := time.Now() + switch w.state.Stage { + case job.JobStage_Queued: + { + // No preparation needed so advance the job directly to "dequeued". + // + // Advance the timestamp by a tiny amount so that the "dequeued" event remains at the same position on the + // timeline as the "queued" event but still ahead of it. + return w.advance(job.JobStage_Dequeued, w.state.Ts.Add(time.Nanosecond), nil) + } + case job.JobStage_Dequeued: + { + if err := w.startWorkflow(); err != nil { + return w.advance(job.JobStage_Failed, now, err) + } else { + w.state.Params[job.JobParam_Start] = float64(time.Now().UnixNano()) + return w.advance(job.JobStage_Started, now, nil) + } + } + case job.JobStage_Started: + { + if workflowRun, err := w.findMatchingWorkflowRun(); err != nil { + return w.advance(job.JobStage_Failed, now, err) + } else if workflowRun != nil { + // Record workflow details and advance the job + w.state.Params[job.JobParam_Id] = float64(workflowRun.GetID()) + w.state.Params[job.WorkflowJobParam_Url] = workflowRun.GetHTMLURL() + return w.advance(job.JobStage_Waiting, now, nil) + } else if job.IsTimedOut(w.state, manager.DefaultWaitTime) { // Workflow did not start in time + return w.advance(job.JobStage_Failed, now, manager.Error_StartupTimeout) + } else { + // Return so we come back again to check + return w.state, nil + } + } + case job.JobStage_Waiting: + { + if status, err := w.checkWorkflowStatus(); err != nil { + return w.advance(job.JobStage_Failed, now, err) + } else if status == gitHub_WorkflowStatus_Success { + return w.advance(job.JobStage_Completed, now, nil) + } else if status == gitHub_WorkflowStatus_Failure { + return w.advance(job.JobStage_Failed, now, nil) + } else if job.IsTimedOut(w.state, workflowFailureTime) { // Workflow did not finish in time + return w.advance(job.JobStage_Failed, now, manager.Error_CompletionTimeout) + } else { + // Return so we come back again to check + return w.state, nil + } + } + default: + { + return w.advance(job.JobStage_Failed, now, fmt.Errorf("githubWorkflowJob: unexpected state: %w", manager.PrintJob(w.state))) + } + } +} + +func (w githubWorkflowJob) startWorkflow() error { + ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime) + defer cancel() + + resp, err := w.client.Actions.CreateWorkflowDispatchEventByFileName( + ctx, w.org, w.repo, w.workflow, github.CreateWorkflowDispatchEventRequest{ + Ref: w.ref, + Inputs: w.inputs, + }) + if err != nil { + return err + } + log.Printf("startWorkflow: rate limit=%d, remaining=%d, resetAt=%s", resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset) + return nil +} + +func (w githubWorkflowJob) findMatchingWorkflowRun() (*github.WorkflowRun, error) { + if workflowRuns, count, err := w.getWorkflowRuns(); err != nil { + return nil, err + } else if count > 0 { + for _, workflowRun := range workflowRuns { + if workflowJobs, count, err := w.getWorkflowJobs(workflowRun); err != nil { + return nil, err + } else if count > 0 { + for _, workflowJob := range workflowJobs { + for _, jobStep := range workflowJob.Steps { + // If we found a job step with our job ID, then we know this is the workflow we're looking for + // and need to monitor. + if jobStep.GetName() == w.state.Job { + return workflowRun, nil + } + } + } + } + } + } + return nil, nil +} + +func (w githubWorkflowJob) getWorkflowRuns() ([]*github.WorkflowRun, int, error) { + ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime) + defer cancel() + + // Limit the search to runs after the start of the job (minus 1 minute, so we avoid any races). + searchTime := time.Unix(0, int64(w.state.Params[job.JobParam_Start].(float64))).Add(-time.Minute) + if workflows, resp, err := w.client.Actions.ListWorkflowRunsByFileName( + ctx, w.org, w.repo, w.workflow, &github.ListWorkflowRunsOptions{ + Branch: w.ref, + Event: gitHub_WorkflowEventType, + Created: ">" + searchTime.Format(gitHub_WorkflowTimeFormat), + ExcludePullRequests: true, + }); err != nil { + return nil, 0, err + } else { + log.Printf("getWorkflowRuns: runs=%d, rate limit=%d, remaining=%d, resetAt=%s", *workflows.TotalCount, resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset) + return workflows.WorkflowRuns, *workflows.TotalCount, nil + } +} + +func (w githubWorkflowJob) getWorkflowJobs(workflowRun *github.WorkflowRun) ([]*github.WorkflowJob, int, error) { + ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime) + defer cancel() + + if jobs, resp, err := w.client.Actions.ListWorkflowJobs(ctx, w.org, w.repo, workflowRun.GetID(), nil); err != nil { + return nil, 0, err + } else { + log.Printf("getWorkflowJobs: jobs=%d, rate limit=%d, remaining=%d, resetAt=%s", *jobs.TotalCount, resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset) + return jobs.Jobs, *jobs.TotalCount, nil + } +} + +func (w githubWorkflowJob) checkWorkflowStatus() (string, error) { + // The workflow run ID should have been filled in by this point + workflowRunId, _ := w.state.Params[job.JobParam_Id].(float64) + if workflowRun, err := w.getWorkflowRun(int64(workflowRunId)); err != nil { + return "", err + } else { + return workflowRun.GetConclusion(), nil + } +} + +func (w githubWorkflowJob) getWorkflowRun(workflowRunId int64) (*github.WorkflowRun, error) { + ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime) + defer cancel() + + if workflowRun, resp, err := w.client.Actions.GetWorkflowRunByID(ctx, w.org, w.repo, workflowRunId); err != nil { + return nil, err + } else { + log.Printf("getWorkflowRun: rate limit=%d, remaining=%d, resetAt=%s", resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset) + return workflowRun, nil + } +} diff --git a/cd/manager/models.go b/cd/manager/models.go index 96c1304..e8f6587 100644 --- a/cd/manager/models.go +++ b/cd/manager/models.go @@ -98,6 +98,7 @@ const ( NotifField_Anchor string = "Anchor Worker(s)" NotifField_TestE2E string = "E2E Tests" NotifField_TestSmoke string = "Smoke Tests" + NotifField_Workflow string = "Workflow(s)" ) // Repository @@ -120,6 +121,14 @@ const DefaultCasMaxAnchorWorkers = 1 const DefaultCasMinAnchorWorkers = 0 const DefaultJobStateTtl = 2 * 7 * 24 * time.Hour // Two weeks +// Tests +const ( + Tests_Org = "3box" + Tests_Repo = "ceramic-tests" + Tests_Ref = "main" + Tests_Workflow = "durable.yml" +) + // For CASv5 workers const CasV5Version = "5" diff --git a/cd/manager/notifs/discord.go b/cd/manager/notifs/discord.go index 6ad0097..82b8b0c 100644 --- a/cd/manager/notifs/discord.go +++ b/cd/manager/notifs/discord.go @@ -103,6 +103,8 @@ func (n JobNotifs) getJobNotif(jobState job.JobState) (jobNotif, error) { return newE2eTestNotif(jobState) case job.JobType_TestSmoke: return newSmokeTestNotif(jobState) + case job.JobType_Workflow: + return newWorkflowNotif(jobState) default: return nil, fmt.Errorf("getJobNotif: unknown job type: %s", jobState.Type) } @@ -205,6 +207,9 @@ func (n JobNotifs) getActiveJobs(jobState job.JobState) []discord.EmbedField { if field, found := n.getActiveJobsByType(jobState, job.JobType_TestSmoke); found { fields = append(fields, field) } + if field, found := n.getActiveJobsByType(jobState, job.JobType_Workflow); found { + fields = append(fields, field) + } return fields } diff --git a/cd/manager/notifs/workflow.go b/cd/manager/notifs/workflow.go new file mode 100644 index 0000000..9360139 --- /dev/null +++ b/cd/manager/notifs/workflow.go @@ -0,0 +1,65 @@ +package notifs + +import ( + "fmt" + "strconv" + "strings" + + "github.com/disgoorg/disgo/discord" + "github.com/disgoorg/disgo/webhook" + + "github.com/3box/pipeline-tools/cd/manager/common/job" +) + +var _ jobNotif = &workflowNotif{} + +const defaultWorkflowJobName = "Workflow" +const workflowLogsFieldName = "Logs" + +type workflowNotif struct { + state job.JobState + workflowWebhook webhook.Client +} + +func newWorkflowNotif(jobState job.JobState) (jobNotif, error) { + if w, err := parseDiscordWebhookUrl("DISCORD_WORKFLOW_WEBHOOK"); err != nil { + return nil, err + } else { + return &workflowNotif{jobState, w}, nil + } +} + +func (w workflowNotif) getChannels() []webhook.Client { + // Skip "started" notifications so that the channel doesn't get too noisy + if w.state.Stage != job.JobStage_Started { + return []webhook.Client{w.workflowWebhook} + } + return nil +} + +func (w workflowNotif) getTitle() string { + jobName := defaultWorkflowJobName + if workflowName, found := w.state.Params[job.WorkflowJobParam_Name].(string); found { + jobName = workflowName + } + return fmt.Sprintf("%s %s", jobName, strings.ToUpper(string(w.state.Stage))) +} + +func (w workflowNotif) getFields() []discord.EmbedField { + if workflowUrl, found := w.state.Params[job.WorkflowJobParam_Url].(string); found { + repo, _ := w.state.Params[job.WorkflowJobParam_Repo].(string) + // The workflow run ID should have been filled in by this point + workflowRunId, _ := w.state.Params[job.JobParam_Id].(float64) + return []discord.EmbedField{ + { + Name: workflowLogsFieldName, + Value: fmt.Sprintf("[%s (%s)](%s)", repo, strconv.Itoa(int(workflowRunId)), workflowUrl), + }, + } + } + return nil +} + +func (w workflowNotif) getColor() discordColor { + return getColorForStage(w.state.Stage) +} diff --git a/cd/manager/utils.go b/cd/manager/utils.go index 9a6cbf8..217b7ab 100644 --- a/cd/manager/utils.go +++ b/cd/manager/utils.go @@ -87,6 +87,8 @@ func NotifField(jt job.JobType) string { return NotifField_TestE2E case job.JobType_TestSmoke: return NotifField_TestSmoke + case job.JobType_Workflow: + return NotifField_Workflow default: return "" } diff --git a/ci/scripts/schedule_job.sh b/ci/scripts/schedule_job.sh new file mode 100755 index 0000000..d82f651 --- /dev/null +++ b/ci/scripts/schedule_job.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +docker run --rm -i \ + -e "AWS_REGION=$AWS_REGION" \ + -e "AWS_ACCESS_KEY_ID=$AWS_ACCESS_KEY_ID" \ + -e "AWS_SECRET_ACCESS_KEY=$AWS_SECRET_ACCESS_KEY" \ + -v ~/.aws:/root/.aws \ + -v "$PWD":/aws \ + amazon/aws-cli dynamodb put-item --table-name "ceramic-$1-ops" --item "$2"