Skip to content

Commit

Permalink
Merge pull request #1370 from flanksource/job-context
Browse files Browse the repository at this point in the history
feat: new job type for sync canary jobs
  • Loading branch information
moshloop authored Oct 30, 2023
2 parents d627846 + 374743a commit cec05dc
Show file tree
Hide file tree
Showing 15 changed files with 232 additions and 93 deletions.
3 changes: 3 additions & 0 deletions api/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/flanksource/commons/logger"
ctemplate "github.com/flanksource/commons/template"
"github.com/flanksource/duty"
dutyCtx "github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/types"
"github.com/flanksource/kommons"
Expand All @@ -19,6 +20,8 @@ import (
"k8s.io/client-go/kubernetes"
)

var DefaultContext dutyCtx.Context

type KubernetesContext struct {
gocontext.Context
Kommons *kommons.Client
Expand Down
5 changes: 5 additions & 0 deletions api/v1/canary_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/flanksource/canary-checker/api/external"
"github.com/flanksource/commons/logger"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

type ResultMode string
Expand Down Expand Up @@ -234,6 +235,10 @@ func (c Canary) GetDescription(check external.Check) string {
return check.GetEndpoint()
}

func (c Canary) GetNamespacedName() types.NamespacedName {
return types.NamespacedName{Name: c.Name, Namespace: c.Namespace}
}

func (c *Canary) SetRunnerName(name string) {
c.Status.runnerName = name
}
Expand Down
16 changes: 16 additions & 0 deletions cmd/operator.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package cmd

import (
gocontext "context"
"os"
"time"

apicontext "github.com/flanksource/canary-checker/api/context"
"github.com/flanksource/canary-checker/pkg/cache"
"github.com/flanksource/canary-checker/pkg/db"
"github.com/flanksource/canary-checker/pkg/jobs"
Expand All @@ -16,9 +18,12 @@ import (
"github.com/flanksource/canary-checker/pkg"
"github.com/flanksource/canary-checker/pkg/controllers"
"github.com/flanksource/canary-checker/pkg/labels"
commonsCtx "github.com/flanksource/commons/context"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty/context"
"github.com/go-logr/zapr"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
"k8s.io/apimachinery/pkg/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -73,6 +78,17 @@ func run(cmd *cobra.Command, args []string) {
if err := db.Init(); err != nil {
logger.Fatalf("error connecting with postgres: %v", err)
}
kommonsClient, k8s, err := pkg.NewKommonsClient()
if err != nil {
logger.Warnf("failed to get kommons client, checks that read kubernetes configs will fail: %v", err)
}

apicontext.DefaultContext = context.NewContext(gocontext.Background(), commonsCtx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker"))).
WithDB(db.Gorm, db.Pool).
WithKubernetes(k8s).
WithKommons(kommonsClient).
WithNamespace(runner.WatchNamespace)

cache.PostgresCache = cache.NewPostgresCache(db.Pool)
if operatorExecutor {
logger.Infof("Starting executors")
Expand Down
27 changes: 20 additions & 7 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/flanksource/canary-checker/pkg/db"
"github.com/flanksource/canary-checker/pkg/jobs/canary"
"github.com/flanksource/canary-checker/pkg/runner"
"github.com/flanksource/canary-checker/pkg/telemetry"
"github.com/flanksource/commons/logger"
gomplate "github.com/flanksource/gomplate/v3"
"github.com/spf13/cobra"
Expand All @@ -31,16 +32,25 @@ var Root = &cobra.Command{
if canary.UpstreamConf.Valid() {
logger.Infof("Pushing checks to %s with name=%s user=%s", canary.UpstreamConf.Host, canary.UpstreamConf.AgentName, canary.UpstreamConf.Username)
}

if otelcollectorURL != "" {
telemetry.InitTracer(otelServiceName, otelcollectorURL, true)
}
},
}

var httpPort = 8080
var publicEndpoint = "http://localhost:8080"
var prometheusURL string
var pushServers, pullServers []string
var sharedLibrary []string
var exposeEnv bool
var logPass, logFail bool
var (
httpPort = 8080
publicEndpoint = "http://localhost:8080"
prometheusURL string
pushServers, pullServers []string
sharedLibrary []string
exposeEnv bool
logPass, logFail bool

otelcollectorURL string
otelServiceName string
)

func ServerFlags(flags *pflag.FlagSet) {
flags.IntVar(&httpPort, "httpPort", httpPort, "Port to expose a health dashboard ")
Expand Down Expand Up @@ -76,6 +86,9 @@ func ServerFlags(flags *pflag.FlagSet) {
flags.StringVar(&canary.UpstreamConf.Password, "upstream-password", os.Getenv("UPSTREAM_PASSWORD"), "upstream password")
flags.StringVar(&canary.UpstreamConf.AgentName, "agent-name", os.Getenv("UPSTREAM_NAME"), "name of this agent")
flags.BoolVar(&canary.UpstreamConf.InsecureSkipVerify, "upstream-insecure-skip-verify", os.Getenv("UPSTREAM_INSECURE_SKIP_VERIFY") == "true", "Skip TLS verification on the upstream servers certificate")

flags.StringVar(&otelcollectorURL, "otel-collector-url", "", "OpenTelemetry gRPC Collector URL in host:port format")
flags.StringVar(&otelServiceName, "otel-service-name", "canary-checker", "OpenTelemetry service name for the resource")
}

func readFromEnv(v string) string {
Expand Down
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
go.mongodb.org/mongo-driver v1.12.1
go.opentelemetry.io/otel v1.19.0
golang.org/x/crypto v0.14.0
golang.org/x/net v0.17.0
golang.org/x/sync v0.4.0
Expand Down Expand Up @@ -111,6 +112,7 @@ require (
github.com/aws/smithy-go v1.14.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
Expand Down Expand Up @@ -160,6 +162,7 @@ require (
github.com/gosimple/slug v1.13.1 // indirect
github.com/gosimple/unidecode v1.0.1 // indirect
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
github.com/hairyhenderson/toml v0.4.2-0.20210923231440-40456b8e66cf // indirect
github.com/hairyhenderson/yaml v0.0.0-20220618171115-2d35fca545ce // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down Expand Up @@ -231,9 +234,12 @@ require (
github.com/yuin/gopher-lua v1.1.0 // indirect
github.com/zclconf/go-cty v1.14.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/sdk v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
go.starlark.net v0.0.0-20230925163745-10651d5192ab // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,8 @@ github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b h1:6+ZFm0flnudZzdSE0JxlhR2hKnGPcNB35BjQf4RYQDY=
github.com/c2h5oh/datasize v0.0.0-20220606134207-859f65c6625b/go.mod h1:S/7n9copUssQ56c7aAgHqftWO4LTf4xY6CGWt8Bc+3M=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw=
Expand Down Expand Up @@ -1108,9 +1110,12 @@ github.com/gosimple/unidecode v1.0.1 h1:hZzFTMMqSswvf0LBJZCZgThIZrpDHFXux9KeGmn6
github.com/gosimple/unidecode v1.0.1/go.mod h1:CP0Cr1Y1kogOtx0bJblKzsVWrqYaqfNOnHzpgWw4Awc=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA=
github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/hairyhenderson/toml v0.4.2-0.20210923231440-40456b8e66cf h1:I1sbT4ZbIt9i+hB1zfKw2mE8C12TuGxPiW7YmtLbPa4=
github.com/hairyhenderson/toml v0.4.2-0.20210923231440-40456b8e66cf/go.mod h1:jDHmWDKZY6MIIYltYYfW4Rs7hQ50oS4qf/6spSiZAxY=
github.com/hairyhenderson/yaml v0.0.0-20220618171115-2d35fca545ce h1:cVkYhlWAxwuS2/Yp6qPtcl0fGpcWxuZNonywHZ6/I+s=
Expand Down Expand Up @@ -1527,14 +1532,21 @@ go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs=
go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0 h1:3d+S281UTjM+AbF31XSOYn1qXn3BgIdWl8HNEpx08Jk=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.19.0/go.mod h1:0+KuTDyKL4gjKCF75pHOX4wuzYDUZYfAQdSu43o+Z2I=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.19.0 h1:Nw7Dv4lwvGrI68+wULbcq7su9K2cebeCUrDjVrUJHxM=
go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE=
go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8=
go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o=
go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A=
go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg=
go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U=
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.starlark.net v0.0.0-20230925163745-10651d5192ab h1:7QkXlIVjYdSsKKSGnM0jQdw/2w9W5qcFDGTc00zKqgI=
go.starlark.net v0.0.0-20230925163745-10651d5192ab/go.mod h1:LcLNIzVOMp4oV+uusnpk+VU+SzXaJakUuBjoCSWH5dM=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/canary_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flanksource/canary-checker/api/context"
v1 "github.com/flanksource/canary-checker/api/v1"
"github.com/flanksource/canary-checker/pkg"
canaryJobs "github.com/flanksource/canary-checker/pkg/jobs/canary"
Expand Down Expand Up @@ -105,7 +106,7 @@ func (r *CanaryReconciler) Reconcile(ctx gocontext.Context, req ctrl.Request) (c

// Sync jobs if canary is created or updated
if canary.Generation == 1 {
if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil {
if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil {
logger.Error(err, "failed to sync canary job")
return ctrl.Result{Requeue: true, RequeueAfter: 2 * time.Minute}, err
}
Expand Down Expand Up @@ -143,7 +144,7 @@ func (r *CanaryReconciler) persistAndCacheCanary(canary *v1.Canary) (*pkg.Canary
}
r.CanaryCache.Set(dbCanary.ID.String(), dbCanary, cache.DefaultExpiration)

if err := canaryJobs.SyncCanaryJob(*dbCanary); err != nil {
if err := canaryJobs.SyncCanaryJob(context.DefaultContext, *dbCanary); err != nil {
return nil, err
}
return dbCanary, nil
Expand Down
20 changes: 10 additions & 10 deletions pkg/db/canary.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package db

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -15,6 +14,7 @@ import (
"github.com/flanksource/canary-checker/pkg/utils"
"github.com/flanksource/commons/logger"
"github.com/flanksource/duty"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
dutyTypes "github.com/flanksource/duty/types"
"github.com/google/uuid"
Expand All @@ -23,7 +23,7 @@ import (
"gorm.io/gorm/clause"
)

func GetAllCanariesForSync(namespace string) ([]pkg.Canary, error) {
func GetAllCanariesForSync(ctx context.Context, namespace string) ([]pkg.Canary, error) {
query := `
SELECT json_agg(
jsonb_set_lax(to_jsonb(canaries),'{checks}', (
Expand All @@ -49,7 +49,7 @@ func GetAllCanariesForSync(namespace string) ([]pkg.Canary, error) {
args["namespace"] = namespace
}

rows, err := Pool.Query(context.Background(), query, args)
rows, err := ctx.Pool().Query(ctx, query, args)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -125,17 +125,17 @@ func PersistCheck(check pkg.Check, canaryID uuid.UUID) (uuid.UUID, error) {
return check.ID, nil
}

func GetTransformedCheckIDs(canaryID string) ([]string, error) {
func GetTransformedCheckIDs(ctx context.Context, canaryID string) ([]string, error) {
var ids []string
err := Gorm.Table("checks").
err := ctx.DB().Table("checks").
Select("id").
Where("canary_id = ? AND transformed = true AND deleted_at IS NULL", canaryID).
Find(&ids).
Error
return ids, err
}

func AddCheckStatuses(ids []string, status models.CheckHealthStatus) error {
func AddCheckStatuses(ctx context.Context, ids []string, status models.CheckHealthStatus) error {
if len(ids) == 0 {
return nil
}
Expand All @@ -158,20 +158,20 @@ func AddCheckStatuses(ids []string, status models.CheckHealthStatus) error {
})
}
}
return Gorm.Table("check_statuses").
return ctx.DB().Table("check_statuses").
Create(objs).
Error
}

func RemoveTransformedChecks(ids []string) error {
func RemoveTransformedChecks(ctx context.Context, ids []string) error {
if len(ids) == 0 {
return nil
}
updates := map[string]any{
"deleted_at": gorm.Expr("NOW()"),
}

return Gorm.Table("checks").
return ctx.DB().Table("checks").
Where("id in (?)", ids).
Where("transformed = true").
Updates(updates).
Expand Down Expand Up @@ -280,7 +280,7 @@ func FindCheck(canary pkg.Canary, name string) (*pkg.Check, error) {

func FindDeletedChecksSince(ctx context.Context, since time.Time) ([]string, error) {
var ids []string
err := Gorm.Model(&models.Check{}).Where("deleted_at > ?", since).Pluck("id", &ids).Error
err := ctx.DB().Model(&models.Check{}).Where("deleted_at > ?", since).Pluck("id", &ids).Error
return ids, err
}

Expand Down
Loading

0 comments on commit cec05dc

Please sign in to comment.