Skip to content

Commit

Permalink
chore(cd): more cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
smrz2001 committed Oct 30, 2023
1 parent 1fc0bf9 commit 98aa6be
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 37 deletions.
3 changes: 2 additions & 1 deletion cd/manager/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ require (
github.com/aws/aws-sdk-go-v2/service/ssm v1.27.12
github.com/disgoorg/disgo v0.13.16
github.com/disgoorg/snowflake/v2 v2.0.0
github.com/google/go-github v17.0.0+incompatible
github.com/google/go-github/v56 v56.0.0
github.com/google/uuid v1.3.0
github.com/joho/godotenv v1.4.0
github.com/mitchellh/mapstructure v1.5.0
golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8
golang.org/x/oauth2 v0.1.0
golang.org/x/text v0.6.0
)
Expand Down
8 changes: 5 additions & 3 deletions cd/manager/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4rEjNlfyDHW9dolSY=
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-github/v56 v56.0.0 h1:TysL7dMa/r7wsQi44BjqlwaHvwlFlqkK8CtBWCX3gb4=
github.com/google/go-github/v56 v56.0.0/go.mod h1:D8cdcX98YWJvi7TLo7zM4/h8ZTx6u6fwGEkCdisopo0=
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
Expand All @@ -90,6 +90,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.2 h1:4jaiDzPyXQvSd7D0EjG45355tLlV3VOECpq10pLC+8s=
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8 h1:Xt4/LzbTwfocTk9ZLEu4onjeFucl88iW+v4j4PWbQuE=
golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
Expand Down
48 changes: 18 additions & 30 deletions cd/manager/jobmanager/jobManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync"
"time"

"golang.org/x/exp/maps"

"github.com/google/uuid"

"github.com/3box/pipeline-tools/cd/manager"
Expand Down Expand Up @@ -127,27 +129,11 @@ func (m *JobManager) processJobs() {
}
}
// Find all jobs in progress and advance their state before looking for new jobs
activeJobs := m.cache.JobsByMatcher(job.IsActiveJob)
if len(activeJobs) > 0 {
log.Printf("processJobs: checking %d jobs in progress: %s", len(activeJobs), manager.PrintJob(activeJobs...))
for _, activeJob := range activeJobs {
m.advanceJob(activeJob)
}
}
// Wait for any running job advancement goroutines to finish before kicking off more jobs
m.waitGroup.Wait()
m.advanceJobs(m.cache.JobsByMatcher(job.IsActiveJob))
// Don't start any new jobs if the job manager is paused. Existing jobs will continue to be advanced.
if !m.paused {
// Advance each freshly discovered "queued" job to the "dequeued" stage
queuedJobs := m.db.QueuedJobs()
if len(queuedJobs) > 0 {
log.Printf("processJobs: found queued %d jobs...", len(queuedJobs))
for _, jobState := range queuedJobs {
m.advanceJob(jobState)
}
// Wait for any running job advancement goroutines to finish before processing jobs
m.waitGroup.Wait()
}
m.advanceJobs(m.db.QueuedJobs())
// Always attempt to check if we have anchor jobs, even if none were dequeued. This is because we might have a
// configured minimum number of jobs to run.
processAnchorJobs := true
Expand Down Expand Up @@ -206,6 +192,17 @@ func (m *JobManager) processJobs() {
m.waitGroup.Wait()
}

func (m *JobManager) advanceJobs(jobs []job.JobState) {
if len(jobs) > 0 {
log.Printf("advanceJobs: advancing %d jobs...", len(jobs))
for _, jobState := range jobs {
m.advanceJob(jobState)
}
// Wait for any running job advancement goroutines to finish
m.waitGroup.Wait()
}
}

func (m *JobManager) checkJobInterval(jobType job.JobType, jobStage job.JobStage, intervalEnv string, processFn func(time.Time) error) error {
if interval, found := os.LookupEnv(intervalEnv); found {
if parsedInterval, err := time.ParseDuration(interval); err != nil {
Expand Down Expand Up @@ -274,10 +271,7 @@ func (m *JobManager) processForceDeployJobs(dequeuedJobs []job.JobState) bool {
}
}
// Now advance all force deploy jobs, order doesn't matter.
for _, deployJob := range forceDeploys {
log.Printf("processForceDeployJobs: starting force deploy job: %s", manager.PrintJob(deployJob))
m.advanceJob(deployJob)
}
m.advanceJobs(maps.Values(forceDeploys))
return true
}
return false
Expand Down Expand Up @@ -357,10 +351,7 @@ func (m *JobManager) processVxAnchorJobs(dequeuedJobs []job.JobState, processV5J
}
}
// Now advance all anchor jobs, order doesn't matter.
for _, anchorJob := range dequeuedAnchors {
log.Printf("processVxAnchorJobs: starting anchor job: %s", manager.PrintJob(anchorJob))
m.advanceJob(anchorJob)
}
m.advanceJobs(dequeuedAnchors)
// If not enough anchor jobs were running to satisfy the configured minimum number of workers, add jobs to the queue
// to make up the difference. These jobs should get picked up in a subsequent job manager iteration, properly
// coordinated with other jobs in the queue. It's ok if we ultimately end up with more jobs queued than the
Expand Down Expand Up @@ -413,10 +404,7 @@ func (m *JobManager) processTestJobs(dequeuedJobs []job.JobState) bool {
dequeuedTests[dequeuedJob.Type] = dequeuedJob
}
}
for _, testJob := range dequeuedTests {
log.Printf("processTestJobs: starting test job: %s", manager.PrintJob(testJob))
m.advanceJob(testJob)
}
m.advanceJobs(maps.Values(dequeuedTests))
return len(dequeuedTests) > 0
} else {
log.Printf("processTestJobs: deployment in progress")
Expand Down
7 changes: 4 additions & 3 deletions cd/manager/repository/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"strings"
"time"

"github.com/google/go-github/github"
"golang.org/x/oauth2"

"github.com/google/go-github/v56/github"

"github.com/3box/pipeline-tools/cd/manager"
)

Expand Down Expand Up @@ -45,7 +46,7 @@ func (g Github) GetLatestCommitHash(repo manager.DeployRepo, branch, shaTag stri
}); err != nil {
return "", err
} else {
log.Printf("getLatestCommitHash: list commits rate limit=%d, remaining=%d, resetAt=%s", resp.Limit, resp.Remaining, resp.Reset)
log.Printf("getLatestCommitHash: list commits rate limit=%d, remaining=%d, resetAt=%s", resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset)
for _, commit := range commits {
sha := *commit.SHA
if checksPassed, err := g.checkRefStatus(repo, sha); err != nil {
Expand All @@ -68,7 +69,7 @@ func (g Github) checkRefStatus(repo manager.DeployRepo, ref string) (bool, error
defer cancel()

status, resp, err := g.client.Repositories.GetCombinedStatus(ctx, manager.GitHubOrg, string(repo), ref, &github.ListOptions{PerPage: 100})
log.Printf("checkRefStatus: status=%s, rate limit=%d, remaining=%d, resetAt=%s", status, resp.Limit, resp.Remaining, resp.Reset)
log.Printf("checkRefStatus: status=%s, rate limit=%d, remaining=%d, resetAt=%s", status, resp.Rate.Limit, resp.Rate.Remaining, resp.Rate.Reset)
return status, err
}
// Wait a few minutes for the status to finalize if it is currently "pending"
Expand Down

0 comments on commit 98aa6be

Please sign in to comment.