Skip to content

Commit

Permalink
chore: embed dutyjob in canary job struct
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmehrotra committed Oct 25, 2023
1 parent a35c454 commit 37434cc
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 67 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
go.mongodb.org/mongo-driver v1.12.1
go.opentelemetry.io/otel v1.19.0
golang.org/x/crypto v0.14.0
golang.org/x/net v0.17.0
golang.org/x/sync v0.4.0
Expand Down Expand Up @@ -231,7 +232,6 @@ require (
github.com/yuin/gopher-lua v1.1.0 // indirect
github.com/zclconf/go-cty v1.14.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.starlark.net v0.0.0-20230925163745-10651d5192ab // indirect
Expand Down
29 changes: 0 additions & 29 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -815,20 +815,6 @@ github.com/fergusstrange/embedded-postgres v1.24.0 h1:WqXbmYrBeT5JfNWQ8Qa+yHa5YJ
github.com/fergusstrange/embedded-postgres v1.24.0/go.mod h1:wL562t1V+iuFwq0UcgMi2e9rp8CROY9wxWZEfP8Y874=
github.com/flanksource/commons v1.17.1 h1:jd114sxRwe2VWcbG/PVVEAWsEkialL6eltbqFGANyuI=
github.com/flanksource/commons v1.17.1/go.mod h1:RDdQI0/QYC4GzicbDaXIvBPjWuQWKLzX8/rFBbFjG5U=
github.com/flanksource/duty v1.0.205 h1:sQq+J4TMx69NnoM4XxBcJZ8P5HM5GjY/7zcuv/IQGTo=
github.com/flanksource/duty v1.0.205/go.mod h1:V3fgZdrBgN47lloIz7MedwD/tq4ycHI8zFOApzUpFv4=
github.com/flanksource/commons v1.17.0 h1:rSahn6c4vyq3bPC5jsayET4y8TECRz6Q8NbooItZiGA=
github.com/flanksource/commons v1.17.0/go.mod h1:RDdQI0/QYC4GzicbDaXIvBPjWuQWKLzX8/rFBbFjG5U=
github.com/flanksource/duty v1.0.201 h1:c8r02bfuF47E2svK+qXCLHKaSqOCZZHKPj+v54eimqc=
github.com/flanksource/duty v1.0.201/go.mod h1:aO1uXnT1eVtiIcicriK4brqJLmeXgbrYWtNR0H5IkLE=
github.com/flanksource/commons v1.15.0 h1:p74hrKzIz0r3H8YN3CuB8ePJOjzPFO0BRLVmpXmeqvY=
github.com/flanksource/commons v1.15.0/go.mod h1:FMZFLcQr98JwBKuKLs44DvCQ2JNoHz5maRIzVufQ9Cs=
github.com/flanksource/commons v1.15.1 h1:cFvxQd5SBFe+q16ciz8Q2IeBMeQ7+atdACGanbW27hg=
github.com/flanksource/commons v1.15.1/go.mod h1:FMZFLcQr98JwBKuKLs44DvCQ2JNoHz5maRIzVufQ9Cs=
github.com/flanksource/duty v1.0.191 h1:acnvyTeQlfqmtyXxWprNFGK/vBTUlqkYwxEPLtXSPrk=
github.com/flanksource/duty v1.0.191/go.mod h1:ikyl/TcRy6Cc0R5b0wEHT7CecV7gyJvrDGq/4oIZHoc=
github.com/flanksource/duty v1.0.197 h1:KRw4EPAD2kcqNPkipnkHzlbf5wmLqg3JgtXqiPzCLhw=
github.com/flanksource/duty v1.0.197/go.mod h1:aO1uXnT1eVtiIcicriK4brqJLmeXgbrYWtNR0H5IkLE=
github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc=
github.com/flanksource/gomplate/v3 v3.20.19 h1:xl+XMYWXtlrO6FfU+VxwjNwX4/oBK3/soOtHRvUt2us=
github.com/flanksource/gomplate/v3 v3.20.19/go.mod h1:2GgHZ2vWmtDspJMBfUIryOuzJSwc8jU7Kw9fDLr0TMA=
Expand Down Expand Up @@ -1139,10 +1125,6 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/hcl/v2 v2.19.1 h1://i05Jqznmb2EXqa39Nsvyan2o5XyMowW5fnCKW5RPI=
github.com/hashicorp/hcl/v2 v2.19.1/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE=
github.com/hashicorp/hcl/v2 v2.18.0 h1:wYnG7Lt31t2zYkcquwgKo6MWXzRUDIeIVU5naZwHLl8=
github.com/hashicorp/hcl/v2 v2.18.0/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE=
github.com/hashicorp/hcl/v2 v2.18.1 h1:6nxnOJFku1EuSawSD81fuviYUV8DxFr3fp2dUi3ZYSo=
github.com/hashicorp/hcl/v2 v2.18.1/go.mod h1:ThLC89FV4p9MPW804KVbe/cEXoQ8NZEh+JtMeeGErHE=
github.com/henvic/httpretty v0.1.2 h1:EQo556sO0xeXAjP10eB+BZARMuvkdGqtfeS4Ntjvkiw=
github.com/henvic/httpretty v0.1.2/go.mod h1:ViEsly7wgdugYtymX54pYp6Vv2wqZmNHayJ6q8tlKCc=
github.com/hirochachacha/go-smb2 v1.1.0 h1:b6hs9qKIql9eVXAiN0M2wSFY5xnhbHAQoCwRKbaRTZI=
Expand Down Expand Up @@ -1523,8 +1505,6 @@ github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/zclconf/go-cty v1.14.0 h1:/Xrd39K7DXbHzlisFP9c4pHao4yyf+/Ug9LEz+Y/yhc=
github.com/zclconf/go-cty v1.14.0/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE=
github.com/zclconf/go-cty v1.14.1 h1:t9fyA35fwjjUMcmL5hLER+e/rEPqrbCK1/OSE4SI9KA=
github.com/zclconf/go-cty v1.14.1/go.mod h1:VvMs5i0vgZdhYawQNq5kePSpLAoz8u1xvZgrPIxfnZE=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
Expand Down Expand Up @@ -2318,13 +2298,6 @@ gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/postgres v1.5.3 h1:qKGY5CPHOuj47K/VxbCXJfFvIUeqMSXXadqdCY+MbBU=
gorm.io/driver/postgres v1.5.3/go.mod h1:F+LtvlFhZT7UBiA81mC9W6Su3D4WUhSboc/36QZU0gk=
gorm.io/gorm v1.25.0/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
gorm.io/driver/postgres v1.5.2 h1:ytTDxxEv+MplXOfFe3Lzm7SjG09fcdb3Z/c056DTBx0=
gorm.io/driver/postgres v1.5.2/go.mod h1:fmpX0m2I1PKuR7mKZiEluwrP3hbs+ps7JIGMUBpCgl8=
gorm.io/driver/postgres v1.5.3 h1:qKGY5CPHOuj47K/VxbCXJfFvIUeqMSXXadqdCY+MbBU=
gorm.io/driver/postgres v1.5.3/go.mod h1:F+LtvlFhZT7UBiA81mC9W6Su3D4WUhSboc/36QZU0gk=
gorm.io/gorm v1.25.0/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
gorm.io/gorm v1.25.4 h1:iyNd8fNAe8W9dvtlgeRI5zSVZPsq3OpcTu37cYcpCmw=
gorm.io/gorm v1.25.4/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
gorm.io/gorm v1.25.5 h1:zR9lOiiYf09VNh5Q1gphfyia1JpiClIWG9hQaxB/mls=
gorm.io/gorm v1.25.5/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
gorm.io/plugin/prometheus v0.0.0-20230504115745-1aec2356381b h1:uHPZdwwf4+AVvAEgZ/LQR1UTub8LJ2nh0wQDW3Dt4jE=
Expand Down Expand Up @@ -2364,8 +2337,6 @@ k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg=
k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20220328201542-3ee0da9b0b42/go.mod h1:Z/45zLw8lUo4wdiUkI+v/ImEGAvu3WatcZl3lPMR4Rk=
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280/go.mod h1:+Axhij7bCpeqhklhUTe3xmOn6bWxolyZEeyaFpjGtl4=
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ=
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM=
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780=
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA=
k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
Expand Down
66 changes: 29 additions & 37 deletions pkg/jobs/canary/canary_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,11 @@ func StartScanCanaryConfigs(dataFile string, configFiles []string) {
}

type CanaryJob struct {
*kommons.Client
dutyjob.JobRuntime
Kubernetes kubernetes.Interface
Canary v1.Canary
DBCanary pkg.Canary
LogPass bool
LogFail bool
dutyjob.Job
Canary v1.Canary
DBCanary pkg.Canary
LogPass bool
LogFail bool
}

func (job CanaryJob) GetNamespacedName() types.NamespacedName {
Expand All @@ -68,25 +66,12 @@ func (job CanaryJob) GetNamespacedName() types.NamespacedName {
var minimumTimeBetweenCanaryRuns = 10 * time.Second
var canaryLastRuntimes = sync.Map{}

func SyncCanary(ctx dutyjob.JobRuntime, args ...any) error {
if len(args) != 2 {
return fmt.Errorf("wrong arg count for SyncCanary: %d", len(args))
}
dbCanary, ok := args[0].(pkg.Canary)
if !ok {
return fmt.Errorf("wrong arg type for dbCanary: %T", args[0])
}

canary, ok := args[1].(v1.Canary)
if !ok {
return fmt.Errorf("wrong arg type for canary: %T", args[1])
}

if runner.IsCanaryIgnored(&canary.ObjectMeta) {
func (j CanaryJob) Run(ctx dutyjob.JobRuntime) error {
if runner.IsCanaryIgnored(&j.Canary.ObjectMeta) {
return nil
}

canaryID := dbCanary.ID.String()
canaryID := j.DBCanary.ID.String()
val, _ := concurrentJobLocks.LoadOrStore(canaryID, &sync.Mutex{})
lock, ok := val.(*sync.Mutex)
if !ok {
Expand All @@ -110,7 +95,7 @@ func SyncCanary(ctx dutyjob.JobRuntime, args ...any) error {

// Skip run if job ran too recently
if lastRunDelta < minimumTimeBetweenCanaryRuns {
logger.Infof("Skipping Canary[%s]:%s since it last ran %.2f seconds ago", canaryID, canary.GetNamespacedName(), lastRunDelta.Seconds())
logger.Infof("Skipping Canary[%s]:%s since it last ran %.2f seconds ago", canaryID, j.Canary.GetNamespacedName(), lastRunDelta.Seconds())
return nil
}

Expand All @@ -120,16 +105,16 @@ func SyncCanary(ctx dutyjob.JobRuntime, args ...any) error {
// Transformed checks have a delete strategy
// On deletion they can either be marked healthy, unhealthy or left as is
checkIDDeleteStrategyMap := make(map[string]string)
canaryCtx := canarycontext.New(ctx.Kommons(), ctx.Kubernetes(), ctx.DB(), ctx.Pool(), canary)
canaryCtx := canarycontext.New(ctx.Kommons(), ctx.Kubernetes(), ctx.DB(), ctx.Pool(), j.Canary)
results, err := checks.RunChecks(canaryCtx)
if err != nil {
logger.Errorf("error running checks for canary %s: %v", canaryID, err)
return nil
}

// TODO: Use ctx with object here
logPass := canary.IsTrace() || canary.IsDebug() || LogPass
logFail := canary.IsTrace() || canary.IsDebug() || LogFail
logPass := j.Canary.IsTrace() || j.Canary.IsDebug() || LogPass
logFail := j.Canary.IsTrace() || j.Canary.IsDebug() || LogFail
for _, result := range results {
if logPass && result.Pass || logFail && !result.Pass {
logger.Infof(result.String())
Expand All @@ -140,7 +125,7 @@ func SyncCanary(ctx dutyjob.JobRuntime, args ...any) error {
checkIDDeleteStrategyMap[checkID] = result.Check.GetTransformDeleteStrategy()
}
}
updateCanaryStatusAndEvent(canary, results)
updateCanaryStatusAndEvent(j.Canary, results)

checkDeleteStrategyGroup := make(map[string][]string)
checksToRemove := utils.SetDifference(existingTransformedChecks, transformedChecksCreated)
Expand Down Expand Up @@ -171,10 +156,6 @@ func SyncCanary(ctx dutyjob.JobRuntime, args ...any) error {
return nil
}

func (job *CanaryJob) NewContext() *canarycontext.Context {
return canarycontext.New(job.Client, job.Kubernetes, db.Gorm, db.Pool, job.Canary)
}

func updateCanaryStatusAndEvent(canary v1.Canary, results []*pkg.CheckResult) {
if CanaryStatusChannel == nil {
return
Expand All @@ -198,9 +179,10 @@ func updateCanaryStatusAndEvent(canary v1.Canary, results []*pkg.CheckResult) {
// Set uptime and latency
uptime, latency := metrics.Record(canary, result)
checkKey := canary.GetKey(result.Check)
checkStatus[checkKey] = &v1.CheckStatus{}
checkStatus[checkKey].Uptime1H = uptime.String()
checkStatus[checkKey].Latency1H = latency.String()
checkStatus[checkKey] = &v1.CheckStatus{
Uptime1H: uptime.String(),
Latency1H: latency.String(),
}

// Increment aggregate uptime
uptimeAgg.Passed += uptime.Passed
Expand All @@ -227,6 +209,7 @@ func updateCanaryStatusAndEvent(canary v1.Canary, results []*pkg.CheckResult) {
lastTransitionedTime = &metav1.Time{Time: time.Now()}
}

// TODO Why is this here ?
push.Queue(pkg.FromV1(canary, result.Check), pkg.FromResult(*result))

// Update status message
Expand Down Expand Up @@ -341,7 +324,16 @@ func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error {
}

updateTime, exists := canaryUpdateTimeCache.Load(dbCanary.ID.String())
newJob := dutyjob.NewJob(ctx, "SyncCanaryJob", canary.Spec.GetSchedule(), SyncCanary, dbCanary, canary).SetID(dbCanary.ID.String())
newJob := CanaryJob{
Job: dutyjob.Job{
Context: ctx,
Name: "SyncCanaryJob",
Schedule: canary.Spec.GetSchedule(),
ID: dbCanary.ID.String(),
},
Canary: *canary,
DBCanary: dbCanary,
}
entry := findCronEntry(dbCanary.ID.String())
if !exists || dbCanary.UpdatedAt.After(updateTime.(time.Time)) || entry == nil {
// Remove entry if it exists
Expand All @@ -368,7 +360,7 @@ func SyncCanaryJob(ctx context.Context, dbCanary pkg.Canary) error {
return nil
}

func SyncCanaryJobs(ctx dutyjob.JobRuntime, _ ...any) error {
func SyncCanaryJobs(ctx dutyjob.JobRuntime) error {
ctx.Debugf("Syncing canary jobs")

canaries, err := db.GetAllCanariesForSync(ctx.Context, runner.WatchNamespace)
Expand Down

0 comments on commit 37434cc

Please sign in to comment.