From 8eb6286832f4e4cfd21e3cc5f7fb6b0ce036c82c Mon Sep 17 00:00:00 2001 From: Mohamed Labouardy Date: Sun, 15 Oct 2023 19:33:23 +0200 Subject: [PATCH 1/2] feat: track number of fetch dependencies --- providers/aws/aws.go | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/providers/aws/aws.go b/providers/aws/aws.go index ecf1e862e..03487a214 100644 --- a/providers/aws/aws.go +++ b/providers/aws/aws.go @@ -6,6 +6,7 @@ import ( log "github.com/sirupsen/logrus" + "github.com/tailwarden/komiser/models" "github.com/tailwarden/komiser/providers" "github.com/tailwarden/komiser/providers/aws/apigateway" "github.com/tailwarden/komiser/providers/aws/cloudfront" @@ -113,15 +114,16 @@ func FetchResources(ctx context.Context, client providers.ProviderClient, region log.Warnf("[%s][AWS] %s", client.Name, err) } else { for _, resource := range resources { - _, err = db.NewInsert().Model(&resource).On("CONFLICT (resource_id) DO UPDATE").Set("cost = EXCLUDED.cost, relations=EXCLUDED.relations").Exec(context.Background()) + _, err = db.NewInsert().Model(&resource).On("CONFLICT (resource_id) DO UPDATE").Set("cost = EXCLUDED.cost, relations=EXCLUDED.relations").Exec(context.Background()) if err != nil { log.WithError(err).Errorf("db trigger failed") } } if telemetry { analytics.TrackEvent("discovered_resources", map[string]interface{}{ - "provider": "AWS", - "resources": len(resources), + "provider": "AWS", + "resources": len(resources), + "dependencies": calculateDependencies(resources), }) } } @@ -129,6 +131,14 @@ func FetchResources(ctx context.Context, client providers.ProviderClient, region } } +func calculateDependencies(resources []models.Resource) int { + total := 0 + for _, resource := range resources { + total += len(resource.Relations) + } + return total +} + func getRegions() []string { return []string{"us-east-1", "us-east-2", "us-west-1", "us-west-2", "ca-central-1", "eu-north-1", "eu-west-1", "eu-west-2", "eu-west-3", "eu-central-1", "ap-northeast-1", "ap-northeast-2", "ap-northeast-3", "ap-southeast-1", "ap-southeast-2", "ap-south-1", "sa-east-1"} } From ef3c74452c9e8a05c006b0451093c4ca59309069 Mon Sep 17 00:00:00 2001 From: Mohamed Labouardy Date: Thu, 19 Oct 2023 21:00:23 +0200 Subject: [PATCH 2/2] feat: support concurrency for aws --- internal/internal.go | 54 ++++++++++++++++++++++++--------------- internal/internal_test.go | 37 +++++++++++++++++++++++++++ providers/aws/aws.go | 36 ++++++++++++++------------ providers/providers.go | 37 +++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 37 deletions(-) create mode 100644 internal/internal_test.go diff --git a/internal/internal.go b/internal/internal.go index 1765a1171..29d8ea202 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -11,6 +11,7 @@ import ( "runtime" "strconv" "strings" + "sync" "time" "github.com/getsentry/sentry-go" @@ -54,7 +55,7 @@ var Arch = runtime.GOARCH var db *bun.DB var analytics utils.Analytics -func Exec(address string, port int, configPath string, telemetry bool, a utils.Analytics, regions []string, cmd *cobra.Command) error { +func Exec(address string, port int, configPath string, telemetry bool, a utils.Analytics, regions []string, _ *cobra.Command) error { analytics = a ctx := context.Background() @@ -79,10 +80,8 @@ func Exec(address string, port int, configPath string, telemetry bool, a utils.A _, err = cron.Every(1).Hours().Do(func() { log.Info("Fetching resources workflow has started") - err = fetchResources(ctx, clients, regions, telemetry) - if err != nil { - log.Fatal(err) - } + + fetchResources(ctx, clients, regions, telemetry) }) if err != nil { @@ -209,7 +208,7 @@ func setupDBConnection(c *models.Config) error { return nil } -func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClient, provider string, telemetry bool, regions []string) { +func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClient, provider string, telemetry bool, regions []string, wp *providers.WorkerPool) { localHub := sentry.CurrentHub().Clone() defer func() { @@ -233,7 +232,7 @@ func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClien switch provider { case "AWS": - aws.FetchResources(ctx, client, regions, db, telemetry, analytics) + aws.FetchResources(ctx, client, regions, db, telemetry, analytics, wp) case "DigitalOcean": do.FetchResources(ctx, client, db, telemetry, analytics) case "OCI": @@ -257,33 +256,48 @@ func triggerFetchingWorfklow(ctx context.Context, client providers.ProviderClien } } -func fetchResources(ctx context.Context, clients []providers.ProviderClient, regions []string, telemetry bool) error { +func fetchResources(ctx context.Context, clients []providers.ProviderClient, regions []string, telemetry bool) { + numWorkers := 64 + wp := providers.NewWorkerPool(numWorkers) + wp.Start() + + var wwg sync.WaitGroup + workflowTrigger := func(client providers.ProviderClient, provider string) { + wwg.Add(1) + go func() { + defer wwg.Done() + triggerFetchingWorfklow(ctx, client, provider, telemetry, regions, wp) + }() + } + for _, client := range clients { if client.AWSClient != nil { - go triggerFetchingWorfklow(ctx, client, "AWS", telemetry, regions) + workflowTrigger(client, "AWS") } else if client.DigitalOceanClient != nil { - go triggerFetchingWorfklow(ctx, client, "DigitalOcean", telemetry, regions) + workflowTrigger(client, "DigitalOcean") } else if client.OciClient != nil { - go triggerFetchingWorfklow(ctx, client, "OCI", telemetry, regions) + workflowTrigger(client, "OCI") } else if client.CivoClient != nil { - go triggerFetchingWorfklow(ctx, client, "Civo", telemetry, regions) + workflowTrigger(client, "Civo") } else if client.K8sClient != nil { - go triggerFetchingWorfklow(ctx, client, "Kubernetes", telemetry, regions) + workflowTrigger(client, "Kubernetes") } else if client.LinodeClient != nil { - go triggerFetchingWorfklow(ctx, client, "Linode", telemetry, regions) + workflowTrigger(client, "Linode") } else if client.TencentClient != nil { - go triggerFetchingWorfklow(ctx, client, "Tencent", telemetry, regions) + workflowTrigger(client, "Tencent") } else if client.AzureClient != nil { - go triggerFetchingWorfklow(ctx, client, "Azure", telemetry, regions) + workflowTrigger(client, "Azure") } else if client.ScalewayClient != nil { - go triggerFetchingWorfklow(ctx, client, "Scaleway", telemetry, regions) + workflowTrigger(client, "Scaleway") } else if client.MongoDBAtlasClient != nil { - go triggerFetchingWorfklow(ctx, client, "MongoDBAtlas", telemetry, regions) + workflowTrigger(client, "MongoDBAtlas") } else if client.GCPClient != nil { - go triggerFetchingWorfklow(ctx, client, "GCP", telemetry, regions) + workflowTrigger(client, "GCP") } } - return nil + + wwg.Wait() + wp.Wait() } func checkUpgrade() { diff --git a/internal/internal_test.go b/internal/internal_test.go new file mode 100644 index 000000000..f8bfc9ee9 --- /dev/null +++ b/internal/internal_test.go @@ -0,0 +1,37 @@ +package internal + +import ( + "context" + "io" + "testing" + + log "github.com/sirupsen/logrus" + + "github.com/tailwarden/komiser/internal/config" + "github.com/tailwarden/komiser/utils" +) + +// BenchmarkFactorial benchmarks the Factorial function. +func BenchmarkFetchResources(b *testing.B) { + // Setup + ctx := context.TODO() + log.SetOutput(io.Discard) + analytics.Init() + cfg, clients, accounts, err := config.Load("/workspaces/komiser/config.toml", false, analytics, db) + if err != nil { + b.Fatalf("Error during config setup: %v", err) + } + err = setupDBConnection(cfg) + if err != nil { + b.Fatalf("Error during DB setup: %v", err) + } + err = utils.SetupSchema(db, cfg, accounts) + if err != nil { + b.Fatalf("Error during schema setup: %v", err) + } + + // The benchmark function will run b.N times + for i := 0; i < b.N; i++ { + fetchResources(ctx, clients, []string{}, false) + } +} diff --git a/providers/aws/aws.go b/providers/aws/aws.go index 03487a214..8e2f7df02 100644 --- a/providers/aws/aws.go +++ b/providers/aws/aws.go @@ -99,7 +99,7 @@ func listOfSupportedServices() []providers.FetchDataFunction { } } -func FetchResources(ctx context.Context, client providers.ProviderClient, regions []string, db *bun.DB, telemetry bool, analytics utils.Analytics) { +func FetchResources(ctx context.Context, client providers.ProviderClient, regions []string, db *bun.DB, telemetry bool, analytics utils.Analytics, wp *providers.WorkerPool) { listOfSupportedRegions := getRegions() if len(regions) > 0 { log.Infof("Komiser will fetch resources from the following regions: %s", strings.Join(regions, ",")) @@ -109,24 +109,26 @@ func FetchResources(ctx context.Context, client providers.ProviderClient, region for _, region := range listOfSupportedRegions { client.AWSClient.Region = region for _, fetchResources := range listOfSupportedServices() { - resources, err := fetchResources(ctx, client) - if err != nil { - log.Warnf("[%s][AWS] %s", client.Name, err) - } else { - for _, resource := range resources { - _, err = db.NewInsert().Model(&resource).On("CONFLICT (resource_id) DO UPDATE").Set("cost = EXCLUDED.cost, relations=EXCLUDED.relations").Exec(context.Background()) - if err != nil { - log.WithError(err).Errorf("db trigger failed") + wp.SubmitTask(func() { + resources, err := fetchResources(ctx, client) + if err != nil { + log.Warnf("[%s][AWS] %s", client.Name, err) + } else { + for _, resource := range resources { + _, err = db.NewInsert().Model(&resource).On("CONFLICT (resource_id) DO UPDATE").Set("cost = EXCLUDED.cost, relations=EXCLUDED.relations").Exec(context.Background()) + if err != nil { + log.WithError(err).Errorf("db trigger failed") + } + } + if telemetry { + analytics.TrackEvent("discovered_resources", map[string]interface{}{ + "provider": "AWS", + "resources": len(resources), + "dependencies": calculateDependencies(resources), + }) } } - if telemetry { - analytics.TrackEvent("discovered_resources", map[string]interface{}{ - "provider": "AWS", - "resources": len(resources), - "dependencies": calculateDependencies(resources), - }) - } - } + }) } } } diff --git a/providers/providers.go b/providers/providers.go index 138fa34da..ef0198714 100644 --- a/providers/providers.go +++ b/providers/providers.go @@ -2,6 +2,7 @@ package providers import ( "context" + "sync" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/aws/aws-sdk-go-v2/aws" @@ -47,3 +48,39 @@ type K8sClient struct { Client *kubernetes.Clientset OpencostBaseUrl string } + +type WorkerPool struct { + numWorkers int + tasks chan func() + wg sync.WaitGroup +} + +func NewWorkerPool(numWorkers int) *WorkerPool { + return &WorkerPool{ + numWorkers: numWorkers, + tasks: make(chan func()), + } +} + +func (wp *WorkerPool) Start() { + for i := 0; i < wp.numWorkers; i++ { + go wp.worker() + } +} + +func (wp *WorkerPool) SubmitTask(task func()) { + wp.wg.Add(1) + wp.tasks <- task +} + +func (wp *WorkerPool) Wait() { + wp.wg.Wait() + close(wp.tasks) +} + +func (wp *WorkerPool) worker() { + for task := range wp.tasks { + task() + wp.wg.Done() + } +}