Skip to content

Commit

Permalink
feat(cd): add github workflow job type
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Oct 31, 2023
1 parent fbf0037 commit 9f0ecc4
Show file tree
Hide file tree
Showing 7 changed files with 384 additions and 8 deletions.
12 changes: 12 additions & 0 deletions cd/manager/common/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const (
JobType_Anchor JobType = "anchor"
JobType_TestE2E JobType = "test_e2e"
JobType_TestSmoke JobType = "test_smoke"
JobType_Workflow JobType = "workflow"
)

type JobStage string
Expand Down Expand Up @@ -52,6 +53,17 @@ const (
AnchorJobParam_Overrides string = "overrides"
)

const (
WorkflowJobParam_Name string = "name"
WorkflowJobParam_Org string = "org"
WorkflowJobParam_Repo string = "repo"
WorkflowJobParam_Ref string = "ref"
WorkflowJobParam_Workflow string = "workflow"
WorkflowJobParam_Inputs string = "inputs"
WorkflowJobParam_Environment string = "environment"
WorkflowJobParam_Url string = "url"
)

// JobState represents the state of a job in the database
type JobState struct {
Job string `dynamodbav:"job"` // Job ID, same for all stages of an individual Job
Expand Down
60 changes: 52 additions & 8 deletions cd/manager/jobmanager/jobManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,10 @@ func (m *JobManager) processJobs() {
if len(dequeuedJobs) > 0 {
// Try to start multiple jobs and collapse similar ones:
// - one deploy at a time
// - any number of anchor workers (compatible with with smoke/E2E tests)
// - one smoke test at a time (compatible with anchor workers, E2E tests)
// - one E2E test at a time (compatible with anchor workers, smoke tests)
// - any number of anchor workers (compatible with non-deploy jobs)
// - one smoke test at a time (compatible with non-deploy jobs)
// - one E2E test at a time (compatible with non-deploy jobs)
// - one workflow at a time (compatible with non-deploy jobs)
//
// Loop over compatible dequeued jobs until we find an incompatible one and need to wait for existing jobs to
// complete.
Expand Down Expand Up @@ -177,6 +178,7 @@ func (m *JobManager) processJobs() {
processAnchorJobs = len(e2eTestJobs) > 0
} else {
m.processTestJobs(dequeuedJobs)
m.processWorkflowJobs(dequeuedJobs)
}
}
// If no deploy jobs were launched, process pending anchor jobs. We don't want to hold on to anchor jobs queued
Expand Down Expand Up @@ -412,6 +414,39 @@ func (m *JobManager) processTestJobs(dequeuedJobs []job.JobState) bool {
return false
}

func (m *JobManager) processWorkflowJobs(dequeuedJobs []job.JobState) bool {
// Check if there are any deploy jobs in progress
activeDeploys := m.cache.JobsByMatcher(func(js job.JobState) bool {
return job.IsActiveJob(js) && (js.Type == job.JobType_Deploy)
})
if len(activeDeploys) == 0 {
dequeuedWorkflow := dequeuedJobs[0]
// Collapse similar, back-to-back workflows into a single run and kick it off.
for i := 1; i < len(dequeuedJobs); i++ {
dequeuedJob := dequeuedJobs[i]
// Break out of the loop as soon as we find a deploy job. We don't want to collapse workflow jobs across
// deploy jobs.
if dequeuedJob.Type == job.JobType_Deploy {
break
} else if dequeuedJob.Type == job.JobType_Workflow {
// Skip the current workflow job, and replace it with a newer one.
if err := m.updateJobStage(dequeuedWorkflow, job.JobStage_Skipped, nil); err != nil {
// Return `true` from here so that no state is changed and the loop can restart cleanly. Any jobs
// already skipped won't be picked up again, which is ok.
return true
}
dequeuedWorkflow = dequeuedJob
}
}
log.Printf("processWorkflowJobs: starting workflow job: %s", manager.PrintJob(dequeuedWorkflow))
m.advanceJob(dequeuedWorkflow)
return true
} else {
log.Printf("processWorkflowJobs: deployment in progress")
}
return false
}

func (m *JobManager) advanceJob(jobState job.JobState) {
m.waitGroup.Add(1)
go func() {
Expand Down Expand Up @@ -451,18 +486,25 @@ func (m *JobManager) postProcessJob(jobState job.JobState) {
case job.JobType_Deploy:
{
switch jobState.Stage {
// For completed deployments, also add a smoke test job 5 minutes in the future to allow the deployment to
// stabilize.
// For completed deployments, also add a test workflow job 5 minutes in the future to allow the deployment
// to stabilize.
case job.JobStage_Completed:
{
if _, err := m.NewJob(job.JobState{
Ts: time.Now().Add(manager.DefaultWaitTime),
Type: job.JobType_TestSmoke,
Type: job.JobType_Workflow,
Params: map[string]interface{}{
job.JobParam_Source: manager.ServiceName,
job.JobParam_Source: manager.ServiceName,
job.WorkflowJobParam_Org: manager.Tests_Org,
job.WorkflowJobParam_Repo: manager.Tests_Repo,
job.WorkflowJobParam_Ref: manager.Tests_Ref,
job.WorkflowJobParam_Workflow: manager.Tests_Workflow,
job.WorkflowJobParam_Inputs: map[string]interface{}{
job.WorkflowJobParam_Environment: m.env,
},
},
}); err != nil {
log.Printf("postProcessJob: failed to queue smoke tests after deploy: %v, %s", err, manager.PrintJob(jobState))
log.Printf("postProcessJob: failed to queue test workflow after deploy: %v, %s", err, manager.PrintJob(jobState))
}
}
// For failed deployments, rollback to the previously deployed commit hash.
Expand Down Expand Up @@ -503,6 +545,8 @@ func (m *JobManager) prepareJobSm(jobState job.JobState) (manager.JobSm, error)
jobSm = jobs.E2eTestJob(jobState, m.db, m.notifs, m.d)
case job.JobType_TestSmoke:
jobSm = jobs.SmokeTestJob(jobState, m.db, m.notifs, m.d)
case job.JobType_Workflow:
jobSm, err = jobs.GitHubWorkflowJob(jobState, m.db, m.notifs)
default:
err = fmt.Errorf("prepareJobSm: unknown job type: %s", manager.PrintJob(jobState))
}
Expand Down
236 changes: 236 additions & 0 deletions cd/manager/jobs/workflow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
package jobs

import (
"context"
"fmt"
"log"
"net/http"
"os"
"time"

"golang.org/x/oauth2"

"github.com/google/go-github/v56/github"

"github.com/3box/pipeline-tools/cd/manager"
"github.com/3box/pipeline-tools/cd/manager/common/job"
)

// Allow up to 4 hours for a workflow to run
const workflowFailureTime = 4 * time.Hour

// GitHub constants
const (
gitHub_WorkflowEventType = "workflow_dispatch"
gitHub_WorkflowTimeFormat = "2006-01-02T15:04:05.000Z" // ISO8601
gitHub_WorkflowJobId = "job_id"
)
const (
gitHub_WorkflowStatus_Success = "success"
gitHub_WorkflowStatus_Failure = "failure"
)

var _ manager.JobSm = &githubWorkflowJob{}

type githubWorkflowJob struct {
baseJob
client *github.Client
org string
repo string
ref string
workflow string
inputs map[string]interface{}
env string
}

func GitHubWorkflowJob(jobState job.JobState, db manager.Database, notifs manager.Notifs) (manager.JobSm, error) {
if org, found := jobState.Params[job.WorkflowJobParam_Org].(string); !found {
return nil, fmt.Errorf("githubWorkflowJob: missing org")
} else if repo, found := jobState.Params[job.WorkflowJobParam_Repo].(string); !found {
return nil, fmt.Errorf("githubWorkflowJob: missing repo")
} else if ref, found := jobState.Params[job.WorkflowJobParam_Ref].(string); !found {
return nil, fmt.Errorf("githubWorkflowJob: missing ref")
} else if workflow, found := jobState.Params[job.WorkflowJobParam_Workflow].(string); !found {
return nil, fmt.Errorf("githubWorkflowJob: missing workflow")
} else {
inputs, _ := jobState.Params[job.WorkflowJobParam_Inputs].(map[string]interface{})
if len(inputs) == 0 {
inputs = make(map[string]interface{}, 1)
}
// Add the job ID to the inputs, so we can track the right workflow corresponding to this job.
inputs[gitHub_WorkflowJobId] = jobState.Job
// Set the environment - it's ok to override it even if it was already set.
env := os.Getenv("ENV")
jobState.Params[job.WorkflowJobParam_Environment] = env

var httpClient *http.Client = nil
if accessToken, found := os.LookupEnv("GITHUB_ACCESS_TOKEN"); found {
ts := oauth2.StaticTokenSource(
&oauth2.Token{AccessToken: accessToken},
)
httpClient = oauth2.NewClient(context.Background(), ts)
}

return &githubWorkflowJob{
baseJob{jobState, db, notifs},
github.NewClient(httpClient),
org,
repo,
ref,
workflow,
inputs,
env,
}, nil
}
}

func (w githubWorkflowJob) Advance() (job.JobState, error) {
now := time.Now()
switch w.state.Stage {
case job.JobStage_Queued:
{
// No preparation needed so advance the job directly to "dequeued".
//
// Advance the timestamp by a tiny amount so that the "dequeued" event remains at the same position on the
// timeline as the "queued" event but still ahead of it.
return w.advance(job.JobStage_Dequeued, w.state.Ts.Add(time.Nanosecond), nil)
}
case job.JobStage_Dequeued:
{
if err := w.startWorkflow(); err != nil {
return w.advance(job.JobStage_Failed, now, err)
} else {
w.state.Params[job.JobParam_Start] = float64(time.Now().UnixNano())
return w.advance(job.JobStage_Started, now, nil)
}
}
case job.JobStage_Started:
{
if workflowRun, err := w.findMatchingWorkflowRun(); err != nil {
return w.advance(job.JobStage_Failed, now, err)
} else if workflowRun != nil {
// Record workflow details and advance the job
w.state.Params[job.JobParam_Id] = float64(workflowRun.GetID())
w.state.Params[job.WorkflowJobParam_Url] = workflowRun.GetHTMLURL()
return w.advance(job.JobStage_Waiting, now, nil)
} else if job.IsTimedOut(w.state, manager.DefaultWaitTime) { // Workflow did not start in time
return w.advance(job.JobStage_Failed, now, manager.Error_StartupTimeout)
} else {
// Return so we come back again to check
return w.state, nil
}
}
case job.JobStage_Waiting:
{
if status, err := w.checkWorkflowStatus(); err != nil {
return w.advance(job.JobStage_Failed, now, err)
} else if status == gitHub_WorkflowStatus_Success {
return w.advance(job.JobStage_Completed, now, nil)
} else if status == gitHub_WorkflowStatus_Failure {
return w.advance(job.JobStage_Failed, now, nil)
} else if job.IsTimedOut(w.state, workflowFailureTime) { // Workflow did not finish in time
return w.advance(job.JobStage_Failed, now, manager.Error_CompletionTimeout)
} else {
// Return so we come back again to check
return w.state, nil
}
}
default:
{
return w.advance(job.JobStage_Failed, now, fmt.Errorf("githubWorkflowJob: unexpected state: %w", manager.PrintJob(w.state)))
}
}
}

func (w githubWorkflowJob) startWorkflow() error {
ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime)
defer cancel()

resp, err := w.client.Actions.CreateWorkflowDispatchEventByFileName(
ctx, w.org, w.repo, w.workflow, github.CreateWorkflowDispatchEventRequest{
Ref: w.ref,
Inputs: w.inputs,
})
if err != nil {
return err
}
log.Printf("startWorkflow: rate limit=%d, remaining=%d, resetAt=%s", resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset)
return nil
}

func (w githubWorkflowJob) findMatchingWorkflowRun() (*github.WorkflowRun, error) {
if workflowRuns, count, err := w.getWorkflowRuns(); err != nil {
return nil, err
} else if count > 0 {
for _, workflowRun := range workflowRuns {
if workflowJobs, count, err := w.getWorkflowJobs(workflowRun); err != nil {
return nil, err
} else if count > 0 {
for _, workflowJob := range workflowJobs {
for _, jobStep := range workflowJob.Steps {
// If we found a job step with our job ID, then we know this is the workflow we're looking for
// and need to monitor.
if jobStep.GetName() == w.state.Job {
return workflowRun, nil
}
}
}
}
}
}
return nil, nil
}

func (w githubWorkflowJob) getWorkflowRuns() ([]*github.WorkflowRun, int, error) {
ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime)
defer cancel()

// Limit the search to runs after the start of the job (minus 1 minute, so we avoid any races).
searchTime := time.Unix(0, int64(w.state.Params[job.JobParam_Start].(float64))).Add(-time.Minute)
if workflows, resp, err := w.client.Actions.ListWorkflowRunsByFileName(
ctx, w.org, w.repo, w.workflow, &github.ListWorkflowRunsOptions{
Branch: w.ref,
Event: gitHub_WorkflowEventType,
Created: ">" + searchTime.Format(gitHub_WorkflowTimeFormat),
ExcludePullRequests: true,
}); err != nil {
return nil, 0, err
} else {
log.Printf("getWorkflowRuns: runs=%d, rate limit=%d, remaining=%d, resetAt=%s", *workflows.TotalCount, resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset)
return workflows.WorkflowRuns, *workflows.TotalCount, nil
}
}

func (w githubWorkflowJob) getWorkflowJobs(workflowRun *github.WorkflowRun) ([]*github.WorkflowJob, int, error) {
ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime)
defer cancel()

if jobs, resp, err := w.client.Actions.ListWorkflowJobs(ctx, w.org, w.repo, workflowRun.GetID(), nil); err != nil {
return nil, 0, err
} else {
log.Printf("getWorkflowJobs: jobs=%d, rate limit=%d, remaining=%d, resetAt=%s", *jobs.TotalCount, resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset)
return jobs.Jobs, *jobs.TotalCount, nil
}
}

func (w githubWorkflowJob) checkWorkflowStatus() (string, error) {
// The workflow run ID should have been filled in by this point
workflowRunId, _ := w.state.Params[job.JobParam_Id].(float64)
if workflowRun, err := w.getWorkflowRun(int64(workflowRunId)); err != nil {
return "", err
} else {
return workflowRun.GetConclusion(), nil
}
}

func (w githubWorkflowJob) getWorkflowRun(workflowRunId int64) (*github.WorkflowRun, error) {
ctx, cancel := context.WithTimeout(context.Background(), manager.DefaultHttpWaitTime)
defer cancel()

if workflowRun, resp, err := w.client.Actions.GetWorkflowRunByID(ctx, w.org, w.repo, workflowRunId); err != nil {
return nil, err
} else {
log.Printf("getWorkflowRun: rate limit=%d, remaining=%d, resetAt=%s", resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset)
return workflowRun, nil
}
}
8 changes: 8 additions & 0 deletions cd/manager/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@ const DefaultCasMaxAnchorWorkers = 1
const DefaultCasMinAnchorWorkers = 0
const DefaultJobStateTtl = 2 * 7 * 24 * time.Hour // Two weeks

// Tests
const (
Tests_Org = "3box"
Tests_Repo = "ceramic-tests"
Tests_Ref = "main"
Tests_Workflow = "durable.yml"
)

// For CASv5 workers
const CasV5Version = "5"

Expand Down
2 changes: 2 additions & 0 deletions cd/manager/notifs/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ func (n JobNotifs) getJobNotif(jobState job.JobState) (jobNotif, error) {
return newE2eTestNotif(jobState)
case job.JobType_TestSmoke:
return newSmokeTestNotif(jobState)
case job.JobType_Workflow:
return newWorkflowNotif(jobState)
default:
return nil, fmt.Errorf("getJobNotif: unknown job type: %s", jobState.Type)
}
Expand Down
Loading

0 comments on commit 9f0ecc4

Please sign in to comment.