Skip to content

Commit

Permalink
Merge pull request #1967 from opengovern/fix-web-ui
Browse files Browse the repository at this point in the history
fix: add more logs for compliance summarizer
  • Loading branch information
artaasadi authored Nov 13, 2024
2 parents 3ad1834 + 619cac0 commit a71d28f
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 222 deletions.
54 changes: 15 additions & 39 deletions pkg/compliance/summarizer/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,13 @@ import (
"encoding/json"
"fmt"
"github.com/opengovern/og-util/pkg/api"
es2 "github.com/opengovern/og-util/pkg/es"
"github.com/opengovern/og-util/pkg/httpclient"
"github.com/opengovern/og-util/pkg/opengovernance-es-sdk"
"strings"

es2 "github.com/opengovern/og-util/pkg/es"
"github.com/opengovern/opengovernance/pkg/compliance/es"
types2 "github.com/opengovern/opengovernance/pkg/compliance/summarizer/types"
es3 "github.com/opengovern/opengovernance/pkg/describe/es"
inventoryApi "github.com/opengovern/opengovernance/pkg/inventory/api"
"github.com/opengovern/opengovernance/pkg/types"
integrationApi "github.com/opengovern/opengovernance/services/integration/api/models"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -65,30 +61,6 @@ func (w *Worker) RunJob(ctx context.Context, j types2.Job) error {
},
ResourcesFindings: make(map[string]types.ResourceFinding),
ResourcesFindingsIsDone: make(map[string]bool),

ResourceCollectionCache: map[string]inventoryApi.ResourceCollection{},
IntegrationCache: map[string]integrationApi.Integration{},
}

resourceCollections, err := w.inventoryClient.ListResourceCollections(&httpclient.Context{Ctx: ctx, UserRole: api.AdminRole})
if err != nil {
w.logger.Error("failed to list resource collections", zap.Error(err))
return err
}
for _, rc := range resourceCollections {
rc := rc
jd.ResourceCollectionCache[rc.ID] = rc
}

integrations, err := w.integrationClient.ListIntegrations(&httpclient.Context{Ctx: ctx, UserRole: api.AdminRole}, nil)
if err != nil {
w.logger.Error("failed to list integrations", zap.Error(err))
return err
}
for _, c := range integrations.Integrations {
c := c
// use provider id instead of opengovernance id because we need that to check resource collections
jd.IntegrationCache[strings.ToLower(c.ProviderID)] = c
}

for page := 1; paginator.HasNext(); page++ {
Expand Down Expand Up @@ -116,24 +88,28 @@ func (w *Worker) RunJob(ctx context.Context, j types2.Job) error {
for _, f := range page {
var resource *es2.LookupResource
potentialResources := lookupResourcesMap[f.PlatformResourceID]
for _, r := range potentialResources {
r := r
w.logger.Info("potential resources", zap.Any("potentialResources", potentialResources),
zap.String("f.ResourceType", f.ResourceType), zap.String("r.ResourceType", r.ResourceType))
if strings.ToLower(r.ResourceType) == strings.ToLower(f.ResourceType) {
resource = &r
break
}
}

//for _, r := range potentialResources {
// r := r
// w.logger.Info("potential resources", zap.Any("potentialResources", potentialResources),
// zap.String("f.ResourceType", f.ResourceType), zap.String("r.ResourceType", r.ResourceType))
// if strings.ToLower(r.ResourceType) == strings.ToLower(f.ResourceType) {
// resource = &r
// break
// }
//}
resource = &potentialResources[0]
w.logger.Info("Before adding resource finding", zap.String("platform_resource_id", f.PlatformResourceID),
zap.Any("resource", resource))
jd.AddComplianceResult(w.logger, j, f, resource)
}

var docs []es2.Doc
for resourceIdType, isReady := range jd.ResourcesFindingsIsDone {
if !isReady {
w.logger.Info("resource NOT DONE", zap.String("platform_resource_id", resourceIdType))
continue
}
w.logger.Info("resource DONE", zap.String("platform_resource_id", resourceIdType))
resourceFinding := jd.SummarizeResourceFinding(w.logger, jd.ResourcesFindings[resourceIdType])
keys, idx := resourceFinding.KeysAndIndex()
resourceFinding.EsID = es2.HashOf(keys...)
Expand Down
120 changes: 13 additions & 107 deletions pkg/compliance/summarizer/types/job_docs.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package types

import (
"fmt"
"strings"

"github.com/opengovern/og-util/pkg/es"
inventoryApi "github.com/opengovern/opengovernance/pkg/inventory/api"
"github.com/opengovern/opengovernance/pkg/types"
integrationApi "github.com/opengovern/opengovernance/services/integration/api/models"
"go.uber.org/zap"
)

Expand All @@ -18,9 +13,6 @@ type JobDocs struct {
// these are used to track if the resource finding is done so we can remove it from the map and send it to queue to save memory
ResourcesFindingsIsDone map[string]bool `json:"-"`
LastResourceIdType string `json:"-"`
// caches, these are not marshalled and only used
ResourceCollectionCache map[string]inventoryApi.ResourceCollection `json:"-"`
IntegrationCache map[string]integrationApi.Integration `json:"-"`
}

func (jd *JobDocs) AddComplianceResult(logger *zap.Logger, job Job,
Expand Down Expand Up @@ -53,12 +45,15 @@ func (jd *JobDocs) AddComplianceResult(logger *zap.Logger, job Job,
}

if jd.LastResourceIdType == "" {
jd.LastResourceIdType = fmt.Sprintf("%s-%s", resource.ResourceType, resource.PlatformID)
} else if jd.LastResourceIdType != fmt.Sprintf("%s-%s", resource.ResourceType, resource.PlatformID) {
jd.LastResourceIdType = resource.PlatformID
} else if jd.LastResourceIdType != resource.PlatformID {
jd.ResourcesFindingsIsDone[jd.LastResourceIdType] = true
jd.LastResourceIdType = fmt.Sprintf("%s-%s", resource.ResourceType, resource.PlatformID)
jd.LastResourceIdType = resource.PlatformID
}
resourceFinding, ok := jd.ResourcesFindings[fmt.Sprintf("%s-%s", resource.ResourceType, resource.PlatformID)]

logger.Info("creating the resource finding", zap.String("platform_resource_id", resource.PlatformID),
zap.Any("resource", resource))
resourceFinding, ok := jd.ResourcesFindings[resource.PlatformID]
if !ok {
resourceFinding = types.ResourceFinding{
PlatformResourceID: resource.PlatformID,
Expand All @@ -71,7 +66,7 @@ func (jd *JobDocs) AddComplianceResult(logger *zap.Logger, job Job,
JobId: job.ID,
EvaluatedAt: job.CreatedAt.UnixMilli(),
}
jd.ResourcesFindingsIsDone[fmt.Sprintf("%s-%s", resource.ResourceType, resource.PlatformID)] = false
jd.ResourcesFindingsIsDone[resource.PlatformID] = false
} else {
resourceFinding.JobId = job.ID
resourceFinding.EvaluatedAt = job.CreatedAt.UnixMilli()
Expand All @@ -84,101 +79,12 @@ func (jd *JobDocs) AddComplianceResult(logger *zap.Logger, job Job,
}
resourceFinding.ComplianceResults = append(resourceFinding.ComplianceResults, complianceResult)

for rcId, rc := range jd.ResourceCollectionCache {
// check if resource is in this resource collection
isIn := false
for _, filter := range rc.Filters {
found := false

for _, integrationType := range filter.Connectors {
if strings.ToLower(integrationType) == strings.ToLower(complianceResult.IntegrationType.String()) {
found = true
break
}
}
if !found && len(filter.Connectors) > 0 {
continue
}

found = false
for _, resourceType := range filter.ResourceTypes {
if strings.ToLower(resourceType) == strings.ToLower(complianceResult.ResourceType) {
found = true
break
}
}
if !found && len(filter.ResourceTypes) > 0 {
continue
}

found = false
for _, accountId := range filter.AccountIDs {
if integration, ok := jd.IntegrationCache[strings.ToLower(accountId)]; ok {
if strings.ToLower(integration.IntegrationID) == strings.ToLower(complianceResult.IntegrationID) {
found = true
break
}
}
}
if !found && len(filter.AccountIDs) > 0 {
continue
}

found = false
for k, v := range filter.Tags {
k := strings.ToLower(k)
v := strings.ToLower(v)

isMatch := false
for _, resourceTag := range resource.Tags {
if strings.ToLower(resourceTag.Key) == k {
if strings.ToLower(resourceTag.Value) == v {
isMatch = true
break
}
}
}
if !isMatch {
found = false
break
}
found = true
}

if !found && len(filter.Tags) > 0 {
continue
}

isIn = true
break
}
if !isIn {
continue
}

resourceFinding.ResourceCollectionMap[rcId] = true
if job.BenchmarkID == complianceResult.BenchmarkID {
benchmarkSummaryRc, ok := jd.BenchmarkSummary.ResourceCollections[rcId]
if !ok {
benchmarkSummaryRc = BenchmarkSummaryResult{
BenchmarkResult: ResultGroup{
Result: Result{
QueryResult: map[types.ComplianceStatus]int{},
SeverityResult: map[types.ComplianceResultSeverity]int{},
SecurityScore: 0,
},
ResourceTypes: map[string]Result{},
Controls: map[string]ControlResult{},
},
Integrations: map[string]ResultGroup{},
}
}
benchmarkSummaryRc.addComplianceResult(complianceResult)
jd.BenchmarkSummary.ResourceCollections[rcId] = benchmarkSummaryRc
}
if _, ok := jd.ResourcesFindingsIsDone[resource.PlatformID]; !ok {
jd.ResourcesFindingsIsDone[resource.PlatformID] = false
}

jd.ResourcesFindings[fmt.Sprintf("%s-%s", resource.ResourceType, resource.PlatformID)] = resourceFinding
logger.Info("adding the resource finding", zap.String("platform_resource_id", resource.PlatformID),
zap.Any("resource", resource))
jd.ResourcesFindings[resource.PlatformID] = resourceFinding
}

func (jd *JobDocs) SummarizeResourceFinding(logger *zap.Logger, resourceFinding types.ResourceFinding) types.ResourceFinding {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ func GetBenchmarkSummary(ctx context.Context, d *plugin.QueryData, _ *plugin.Hyd
if d.Quals["time_at"] != nil {
timeAt = utils.GetPointer(d.EqualsQuals["time_at"].GetTimestampValue().AsTime())
}
var connectionIds []string
if d.EqualsQuals["connection_id"] != nil {
q := d.EqualsQuals["connection_id"]
var integrationIds []string
if d.EqualsQuals["integration_id"] != nil {
q := d.EqualsQuals["integration_id"]
if q.GetListValue() != nil {
for _, v := range q.GetListValue().Values {
connectionIds = append(connectionIds, v.GetStringValue())
integrationIds = append(integrationIds, v.GetStringValue())
}
} else {
connectionIds = []string{d.EqualsQuals["connection_id"].GetStringValue()}
integrationIds = []string{d.EqualsQuals["integration_id"].GetStringValue()}
}
}

res, err := complianceClient.GetBenchmarkSummary(&httpclient.Context{UserRole: api.AdminRole}, benchmarkId, connectionIds, timeAt)
res, err := complianceClient.GetBenchmarkSummary(&httpclient.Context{UserRole: api.AdminRole}, benchmarkId, integrationIds, timeAt)
if err != nil {
plugin.Logger(ctx).Error("GetBenchmarkSummary compliance client call failed", "error", err)
return nil, err
Expand Down Expand Up @@ -73,19 +73,19 @@ func ListBenchmarkControls(ctx context.Context, d *plugin.QueryData, _ *plugin.H
if d.Quals["time_at"] != nil {
timeAt = utils.GetPointer(d.EqualsQuals["time_at"].GetTimestampValue().AsTime())
}
var connectionIds []string
if d.EqualsQuals["connection_id"] != nil {
q := d.EqualsQuals["connection_id"]
var integrationIds []string
if d.EqualsQuals["integration_id"] != nil {
q := d.EqualsQuals["integration_id"]
if q.GetListValue() != nil {
for _, v := range q.GetListValue().Values {
connectionIds = append(connectionIds, v.GetStringValue())
integrationIds = append(integrationIds, v.GetStringValue())
}
} else {
connectionIds = []string{d.EqualsQuals["connection_id"].GetStringValue()}
integrationIds = []string{d.EqualsQuals["integration_id"].GetStringValue()}
}
}

apiRes, err := complianceClient.GetBenchmarkControls(&httpclient.Context{UserRole: api.AdminRole}, benchmarkId, connectionIds, timeAt)
apiRes, err := complianceClient.GetBenchmarkControls(&httpclient.Context{UserRole: api.AdminRole}, benchmarkId, integrationIds, timeAt)
if err != nil {
plugin.Logger(ctx).Error("GetBenchmarkSummary compliance client call failed", "error", err)
return nil, err
Expand Down
48 changes: 24 additions & 24 deletions pkg/steampipe-plugin-opengovernance/opengovernance-client/cost.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,18 +133,18 @@ func ListCostSummary(ctx context.Context, d *plugin.QueryData, _ *plugin.Hydrate
plugin.Logger(ctx).Warn("ListCostSummary: Page", v)
for _, connRes := range v.Integrations {
row := OpenGovernanceCostTableRow{
ConnectionID: connRes.IntegrationID,
ConnectionName: connRes.IntegrationName,
Connector: connRes.IntegrationType.String(),
Date: v.Date,
DateEpoch: v.DateEpoch,
Month: v.Month,
Year: v.Year,
MetricID: v.MetricID,
MetricName: v.MetricName,
CostValue: connRes.CostValue,
PeriodStart: time.UnixMilli(v.PeriodStart),
PeriodEnd: time.UnixMilli(v.PeriodEnd),
IntegrationID: connRes.IntegrationID,
IntegrationName: connRes.IntegrationName,
IntegrationType: connRes.IntegrationType.String(),
Date: v.Date,
DateEpoch: v.DateEpoch,
Month: v.Month,
Year: v.Year,
MetricID: v.MetricID,
MetricName: v.MetricName,
CostValue: connRes.CostValue,
PeriodStart: time.UnixMilli(v.PeriodStart),
PeriodEnd: time.UnixMilli(v.PeriodEnd),
}
d.StreamListItem(ctx, row)
}
Expand All @@ -156,16 +156,16 @@ func ListCostSummary(ctx context.Context, d *plugin.QueryData, _ *plugin.Hydrate
}

type OpenGovernanceCostTableRow struct {
ConnectionID string `json:"connection_id"`
ConnectionName string `json:"connection_name"`
Connector string `json:"connector"`
Date string `json:"date"`
DateEpoch int64 `json:"date_epoch"`
Month string `json:"month"`
Year string `json:"year"`
MetricID string `json:"metric_id"`
MetricName string `json:"metric_name"`
CostValue float64 `json:"cost_value"`
PeriodStart time.Time `json:"period_start"`
PeriodEnd time.Time `json:"period_end"`
IntegrationID string `json:"integration_id"`
IntegrationName string `json:"integration_name"`
IntegrationType string `json:"integration_type"`
Date string `json:"date"`
DateEpoch int64 `json:"date_epoch"`
Month string `json:"month"`
Year string `json:"year"`
MetricID string `json:"metric_id"`
MetricName string `json:"metric_name"`
CostValue float64 `json:"cost_value"`
PeriodStart time.Time `json:"period_start"`
PeriodEnd time.Time `json:"period_end"`
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var listFindingFilters = map[string]string{
"id": "ID",
"benchmark_id": "benchmarkID",
"policy_id": "policyID",
"connection_id": "connectionID",
"integration_id": "integrationID",
"described_at": "describedAt",
"evaluated_at": "evaluatedAt",
"state_active": "stateActive",
Expand Down
Loading

0 comments on commit a71d28f

Please sign in to comment.