Skip to content

Commit

Permalink
fix: move information request to checkup
Browse files Browse the repository at this point in the history
  • Loading branch information
artaasadi committed Nov 16, 2024
1 parent 2b37cb0 commit b4fdd0d
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 73 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
github.com/lib/pq v1.10.9
github.com/nats-io/nats.go v1.36.0
github.com/open-policy-agent/opa v0.69.0
github.com/opengovern/og-util v1.1.5
github.com/opengovern/og-util v1.1.6
github.com/opengovern/plugin-aws v0.7.3
github.com/opengovern/plugin-gcp v0.0.0-20241014134959-2c0f222fc07b
github.com/opengovern/plugin-kubernetes-internal v0.18.12
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,8 @@ github.com/opencontainers/runc v1.2.0 h1:qke7ZVCmJcKrJVY2iHJVC+0kql9uYdkusOPsQOO
github.com/opencontainers/runc v1.2.0/go.mod h1:/PXzF0h531HTMsYQnmxXkBD7YaGShm/2zcRB79dksUc=
github.com/opengovern/og-util v1.1.5 h1:D53Z669MsaKJJFHpIYnJe0iK0HX2jnYUTFTFkKd9lWg=
github.com/opengovern/og-util v1.1.5/go.mod h1:dyn8rhmxq59o1jnbgGfmcUvW7iB/eN6OxoTUUx6jEHA=
github.com/opengovern/og-util v1.1.6 h1:oZn3QvYWt8ZuyvbHeNOcAJvcyZf+pbqRPX5/sbnvRVA=
github.com/opengovern/og-util v1.1.6/go.mod h1:dyn8rhmxq59o1jnbgGfmcUvW7iB/eN6OxoTUUx6jEHA=
github.com/opengovern/plugin-aws v0.7.3 h1:76hZOjulNlgn4uaq5lq1/pmGmgJqvX1ZQbgqcQn03gI=
github.com/opengovern/plugin-aws v0.7.3/go.mod h1:zfTMswfCyXZ0gD6SDCsmKg55LseXzeFzOH4jXn2QJVo=
github.com/opengovern/plugin-gcp v0.0.0-20241014134959-2c0f222fc07b h1:4xP98kDpOXUu6RcFJyZN63OeA2I26MLS+dEB9JWYQpY=
Expand Down
63 changes: 0 additions & 63 deletions pkg/analytics/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"math"
"net/http"
"reflect"
"regexp"
"strings"
"time"

authApi "github.com/opengovern/og-util/pkg/api"
shared_entities "github.com/opengovern/og-util/pkg/api/shared-entities"
esSinkClient "github.com/opengovern/og-util/pkg/es/ingest/client"
"github.com/opengovern/og-util/pkg/httpclient"
"github.com/opengovern/og-util/pkg/jq"
"github.com/opengovern/opengovernance/pkg/utils"
integrationApi "github.com/opengovern/opengovernance/services/integration/api/models"
integrationClient "github.com/opengovern/opengovernance/services/integration/client"
inventoryApi "github.com/opengovern/opengovernance/services/inventory/api"
Expand Down Expand Up @@ -105,68 +101,9 @@ func (j *Job) Do(
fail(err)
}

if config.DoTelemetry {
// send telemetry
j.SendTelemetry(ctx, logger, config, integrationClient, inventoryClient)
}

return result
}

func (j *Job) SendTelemetry(ctx context.Context, logger *zap.Logger, workerConfig config.WorkerConfig, integrationClient integrationClient.IntegrationServiceClient, inventoryClient inventoryClient.InventoryServiceClient) {
now := time.Now()

httpCtx := httpclient.Context{Ctx: ctx, UserRole: authApi.AdminRole}

req := shared_entities.CspmUsageRequest{
GatherTimestamp: now,
Hostname: workerConfig.TelemetryHostname,
IntegrationTypeCount: make(map[string]int),
ApproximateSpend: 0,
}

integrations, err := integrationClient.ListIntegrations(&httpCtx, nil)
if err != nil {
logger.Error("failed to list sources", zap.Error(err))
return
}
for _, integration := range integrations.Integrations {
if _, ok := req.IntegrationTypeCount[integration.IntegrationType.String()]; !ok {
req.IntegrationTypeCount[integration.IntegrationType.String()] = 0
}
req.IntegrationTypeCount[integration.IntegrationType.String()] += 1
}

connData, err := inventoryClient.ListIntegrationsData(&httpCtx, nil, nil,
utils.GetPointer(now.AddDate(0, -1, 0)), &now, nil, true, false)
if err != nil {
logger.Error("failed to list connections data", zap.Error(err))
return
}
totalSpend := float64(0)
for _, conn := range connData {
if conn.TotalCost != nil {
totalSpend += *conn.TotalCost
}
}

req.ApproximateSpend = int(math.Floor(totalSpend/5000000)) * 5000000

url := fmt.Sprintf("%s/api/v1/information/usage", workerConfig.TelemetryBaseURL)
reqBytes, err := json.Marshal(req)
if err != nil {
logger.Error("failed to marshal telemetry request", zap.Error(err))
return
}
var resp any
if statusCode, err := httpclient.DoRequest(httpCtx.Ctx, http.MethodPost, url, httpCtx.ToHeaders(), reqBytes, &resp); err != nil {
logger.Error("failed to send telemetry", zap.Error(err), zap.Int("status_code", statusCode), zap.String("url", url), zap.Any("req", req), zap.Any("resp", resp))
return
}

logger.Info("sent telemetry", zap.String("url", url))
}

func (j *Job) Run(ctx context.Context, jq *jq.JobQueue, dbc db.Database, encodedResourceCollectionFilters map[string]string, steampipeDB *steampipe.Database, schedulerClient describeClient.SchedulerServiceClient, integrationClient integrationClient.IntegrationServiceClient, sinkClient esSinkClient.EsSinkServiceClient, inventoryClient inventoryClient.InventoryServiceClient, logger *zap.Logger, config config.WorkerConfig) error {
startTime := time.Now()
metrics, err := dbc.ListMetrics([]db.AnalyticMetricStatus{db.AnalyticMetricStatusActive, db.AnalyticMetricStatusInvisible})
Expand Down
9 changes: 9 additions & 0 deletions pkg/checkup/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package checkup

import (
"errors"
config2 "github.com/opengovern/og-util/pkg/config"
"github.com/opengovern/opengovernance/pkg/checkup/config"
"os"

"github.com/spf13/cobra"
Expand All @@ -11,6 +13,8 @@ import (
var (
PrometheusPushAddress = os.Getenv("PROMETHEUS_PUSH_ADDRESS")
IntegrationBaseUrl = os.Getenv("INTEGRATION_BASE_URL")
AuthBaseUrl = os.Getenv("AUTH_BASE_URL")
MetadataBaseUrl = os.Getenv("METADATA_BASE_URL")
NATSAddress = os.Getenv("NATS_URL")
)

Expand All @@ -32,13 +36,18 @@ func WorkerCommand() *cobra.Command {
}

cmd.SilenceUsage = true
var cnf config.WorkerConfig
config2.ReadFromEnv(&cnf, nil)

w, err := NewWorker(
id,
NATSAddress,
logger,
PrometheusPushAddress,
IntegrationBaseUrl,
AuthBaseUrl,
MetadataBaseUrl,
cnf,
cmd.Context(),
)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions pkg/checkup/config/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package config

type WorkerConfig struct {
DoTelemetry bool `yaml:"do_telemetry"`
TelemetryWorkspaceID string `yaml:"telemetry_workspace_id"`
TelemetryHostname string `yaml:"telemetry_hostname"`
TelemetryBaseURL string `yaml:"telemetry_base_url"`
}
67 changes: 66 additions & 1 deletion pkg/checkup/job.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package checkup

import (
"encoding/json"
"fmt"
authAPI "github.com/opengovern/og-util/pkg/api"
shared_entities "github.com/opengovern/og-util/pkg/api/shared-entities"
"github.com/opengovern/og-util/pkg/httpclient"
"github.com/opengovern/opengovernance/pkg/checkup/config"
authClient "github.com/opengovern/opengovernance/services/auth/client"
metadataClient "github.com/opengovern/opengovernance/services/metadata/client"
"golang.org/x/net/context"
"net/http"
"strconv"
"time"

Expand Down Expand Up @@ -41,7 +48,8 @@ type JobResult struct {
Error string
}

func (j Job) Do(integrationClient client.IntegrationServiceClient, logger *zap.Logger) (r JobResult) {
func (j Job) Do(integrationClient client.IntegrationServiceClient, authClient authClient.AuthServiceClient,
metadataClient metadataClient.MetadataServiceClient, logger *zap.Logger, config config.WorkerConfig) (r JobResult) {
startTime := time.Now().Unix()
defer func() {
if err := recover(); err != nil {
Expand Down Expand Up @@ -107,9 +115,66 @@ func (j Job) Do(integrationClient client.IntegrationServiceClient, logger *zap.L
DoCheckupJobsCount.WithLabelValues(strconv.Itoa(int(j.JobID)), "successful").Inc()
}

if config.DoTelemetry {
j.SendTelemetry(context.Background(), logger, config, integrationClient, authClient, metadataClient)
}

return JobResult{
JobID: j.JobID,
Status: status,
Error: errMsg,
}
}

func (j *Job) SendTelemetry(ctx context.Context, logger *zap.Logger, workerConfig config.WorkerConfig,
integrationClient client.IntegrationServiceClient, authClient authClient.AuthServiceClient, metadataClient metadataClient.MetadataServiceClient) {
now := time.Now()

httpCtx := httpclient.Context{Ctx: ctx, UserRole: authAPI.AdminRole}

req := shared_entities.CspmUsageRequest{
GatherTimestamp: now,
Hostname: workerConfig.TelemetryHostname,
IntegrationTypeCount: make(map[string]int),
}

integrations, err := integrationClient.ListIntegrations(&httpCtx, nil)
if err != nil {
logger.Error("failed to list sources", zap.Error(err))
return
}
for _, integration := range integrations.Integrations {
if _, ok := req.IntegrationTypeCount[integration.IntegrationType.String()]; !ok {
req.IntegrationTypeCount[integration.IntegrationType.String()] = 0
}
req.IntegrationTypeCount[integration.IntegrationType.String()] += 1
}

users, err := authClient.ListUsers(&httpCtx)
if err != nil {
logger.Error("failed to list users", zap.Error(err))
return
}
req.NumberOfUsers = int64(len(users))

about, err := metadataClient.GetAbout(&httpCtx)
if err != nil {
logger.Error("failed to get about", zap.Error(err))
return
}
req.InstallId = about.InstallID

url := fmt.Sprintf("%s/api/v1/information/usage", workerConfig.TelemetryBaseURL)
reqBytes, err := json.Marshal(req)
if err != nil {
logger.Error("failed to marshal telemetry request", zap.Error(err))
return
}
var resp any
if statusCode, err := httpclient.DoRequest(httpCtx.Ctx, http.MethodPost, url, httpCtx.ToHeaders(), reqBytes, &resp); err != nil {
logger.Error("failed to send telemetry", zap.Error(err), zap.Int("status_code", statusCode), zap.String("url", url), zap.Any("req", req), zap.Any("resp", resp))
return
}

logger.Info("sent telemetry", zap.String("url", url))
}
18 changes: 15 additions & 3 deletions pkg/checkup/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"encoding/json"
"fmt"
"github.com/opengovern/og-util/pkg/jq"
"github.com/opengovern/opengovernance/pkg/checkup/config"
authClient "github.com/opengovern/opengovernance/services/auth/client"
metadataClient "github.com/opengovern/opengovernance/services/metadata/client"

"github.com/nats-io/nats.go/jetstream"
"github.com/opengovern/opengovernance/services/integration/client"
Expand All @@ -16,16 +19,22 @@ type Worker struct {
id string
jq *jq.JobQueue
logger *zap.Logger
config config.WorkerConfig
pusher *push.Pusher
integrationClient client.IntegrationServiceClient
authClient authClient.AuthServiceClient
metadataClient metadataClient.MetadataServiceClient
}

func NewWorker(
id string,
natsURL string,
logger *zap.Logger,
prometheusPushAddress string,
onboardBaseURL string,
integrationBaseURL string,
authBaseURL string,
metadataBaseURL string,
config config.WorkerConfig,
ctx context.Context,
) (w *Worker, err error) {
if id == "" {
Expand Down Expand Up @@ -56,7 +65,10 @@ func NewWorker(
w.pusher.Collector(DoCheckupJobsCount).
Collector(DoCheckupJobsDuration)

w.integrationClient = client.NewIntegrationServiceClient(onboardBaseURL)
w.integrationClient = client.NewIntegrationServiceClient(integrationBaseURL)
w.authClient = authClient.NewAuthClient(authBaseURL)
w.metadataClient = metadataClient.NewMetadataServiceClient(metadataBaseURL)
w.config = config
return w, nil
}

Expand All @@ -83,7 +95,7 @@ func (w *Worker) Run(ctx context.Context) error {

w.logger.Info("Processing job", zap.Int("jobID", int(job.JobID)))

result := job.Do(w.integrationClient, w.logger)
result := job.Do(w.integrationClient, w.authClient, w.metadataClient, w.logger, w.config)

bytes, err := json.Marshal(result)
if err != nil {
Expand Down
34 changes: 34 additions & 0 deletions services/auth/client/auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package client

import (
"fmt"
"github.com/labstack/echo/v4"
"github.com/opengovern/og-util/pkg/httpclient"
"github.com/opengovern/opengovernance/services/auth/api"
"net/http"
)

type AuthServiceClient interface {
ListUsers(ctx *httpclient.Context) ([]api.GetUsersResponse, error)
}

type authClient struct {
baseURL string
}

func NewAuthClient(baseURL string) AuthServiceClient {
return &authClient{baseURL: baseURL}
}

func (s *authClient) ListUsers(ctx *httpclient.Context) ([]api.GetUsersResponse, error) {
url := fmt.Sprintf("%s/api/v1/users", s.baseURL)

var users []api.GetUsersResponse
if statusCode, err := httpclient.DoRequest(ctx.Ctx, http.MethodGet, url, ctx.ToHeaders(), nil, &users); err != nil {
if 400 <= statusCode && statusCode < 500 {
return nil, echo.NewHTTPError(statusCode, err.Error())
}
return nil, err
}
return users, nil
}
5 changes: 3 additions & 2 deletions services/information/db/model/cspm_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
type CspmUsage struct {
gorm.Model

InstallId string `json:"install_id" gorm:"index:install_id_hostname"`
GatherTimestamp time.Time `json:"gather_timestamp" gorm:"index:,sort:desc"`

Hostname string `json:"hostname" gorm:"index:ws_id_hostname"`
Hostname string `json:"hostname" gorm:"index:install_id_hostname"`
NumberOfUsers int64 `json:"number_of_users"`
IntegrationTypeCount map[string]int `json:"integration_type_count"`
ApproximateSpend int `json:"approximate_spend"`
}
3 changes: 2 additions & 1 deletion services/information/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ func NewInformationService(cfg config.InformationConfig, logger *zap.Logger, csp
func (s *InformationService) RecordUsage(ctx context.Context, req shared_entities.CspmUsageRequest) error {

m := model.CspmUsage{
InstallId: req.InstallId,
GatherTimestamp: req.GatherTimestamp,
Hostname: req.Hostname,
NumberOfUsers: req.NumberOfUsers,
IntegrationTypeCount: req.IntegrationTypeCount,
ApproximateSpend: req.ApproximateSpend,
}

if err := s.csmpUsageRepo.Create(ctx, &m); err != nil {
Expand Down
1 change: 1 addition & 0 deletions services/metadata/api/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type DexConnectorInfo struct {
}

type About struct {
InstallID string `json:"install_id"`
DexConnectors []DexConnectorInfo `json:"dex_connectors"`
AppVersion string `json:"app_version"`
WorkspaceCreationTime time.Time `json:"workspace_creation_time"`
Expand Down
15 changes: 15 additions & 0 deletions services/metadata/client/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type MetadataServiceClient interface {
SetQueryParameter(ctx *httpclient.Context, request api.SetQueryParameterRequest) error
VaultConfigured(ctx *httpclient.Context) (*string, error)
GetViewsCheckpoint(ctx *httpclient.Context) (*api.GetViewsCheckpointResponse, error)
GetAbout(ctx *httpclient.Context) (*api.About, error)
}

type metadataClient struct {
Expand Down Expand Up @@ -160,3 +161,17 @@ func (s *metadataClient) GetViewsCheckpoint(ctx *httpclient.Context) (*api.GetVi
}
return &resp, nil
}

func (s *metadataClient) GetAbout(ctx *httpclient.Context) (*api.About, error) {
url := fmt.Sprintf("%s/api/v3/about", s.baseURL)

var about api.About
if statusCode, err := httpclient.DoRequest(ctx.Ctx, http.MethodPost, url, ctx.ToHeaders(), nil, &about); err != nil {
if 400 <= statusCode && statusCode < 500 {
return nil, echo.NewHTTPError(statusCode, err.Error())
}
return nil, err
}

return &about, nil
}
Loading

0 comments on commit b4fdd0d

Please sign in to comment.