diff --git a/analytics/aggregate.go b/analytics/aggregate.go index bea95bd56..5577dda3b 100644 --- a/analytics/aggregate.go +++ b/analytics/aggregate.go @@ -147,6 +147,20 @@ type SQLAnalyticsRecordAggregate struct { Code `json:"code" gorm:"embedded"` } +type GraphSQLAnalyticsRecordAggregate struct { + ID string `gorm:"primaryKey"` + + OrgID string `json:"org_id"` + Dimension string `json:"dimension"` + DimensionValue string `json:"dimension_value"` + APIID string `json:"api_id"` + + Counter `json:"counter" gorm:"embedded"` + Code `json:"code" gorm:"embedded"` + + TimeStamp int64 `json:"timestamp"` +} + type Code struct { Code1x int `json:"1x" gorm:"1x"` Code200 int `json:"200" gorm:"200"` @@ -611,6 +625,7 @@ func replaceUnsupportedChars(path string) string { return result } +// AggregateGraphData collects the graph records into a map of GraphRecordAggregate to apiID func AggregateGraphData(data []interface{}, dbIdentifier string, aggregationTime int) map[string]GraphRecordAggregate { aggregateMap := make(map[string]GraphRecordAggregate) @@ -619,14 +634,13 @@ func AggregateGraphData(data []interface{}, dbIdentifier string, aggregationTime if !ok { continue } - if !record.IsGraphRecord() { continue } graphRec := record.ToGraphRecord() - aggregate, found := aggregateMap[record.OrgID] + aggregate, found := aggregateMap[record.APIID] if !found { aggregate = NewGraphRecordAggregate() @@ -676,7 +690,7 @@ func AggregateGraphData(data []interface{}, dbIdentifier string, aggregationTime aggregate.RootFields[field].Identifier = field aggregate.RootFields[field].HumanIdentifier = field } - aggregateMap[record.OrgID] = aggregate + aggregateMap[record.APIID] = aggregate } return aggregateMap } diff --git a/analytics/aggregate_test.go b/analytics/aggregate_test.go index cd99c8d1f..77ec7624e 100644 --- a/analytics/aggregate_test.go +++ b/analytics/aggregate_test.go @@ -90,6 +90,8 @@ func TestTrimTag(t *testing.T) { } func TestAggregateGraphData(t *testing.T) { + query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}` + rawResponse := `{"data":{"characters":{"info":{"count":758}}}}` sampleRecord := AnalyticsRecord{ TimeStamp: time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC), Method: "POST", @@ -109,6 +111,8 @@ func TestAggregateGraphData(t *testing.T) { APIKey: "test-key", TrackPath: true, OauthID: "test-id", + RawRequest: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query))), + RawResponse: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(rawResponse), rawResponse))), } compareFields := func(r *require.Assertions, expected, actual map[string]*Counter) { @@ -133,16 +137,12 @@ func TestAggregateGraphData(t *testing.T) { records := make([]interface{}, 3) for i := range records { record := sampleRecord - query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}` - response := `{"data":{"characters":{"info":{"count":758}}}}` - record.RawRequest = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query))) - record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response))) records[i] = record } return records }, expectedAggregate: map[string]GraphRecordAggregate{ - "test-org": { + "test-api": { Types: map[string]*Counter{ "Characters": {Hits: 3, ErrorTotal: 0, Success: 3}, "Info": {Hits: 3, ErrorTotal: 0, Success: 3}, @@ -163,10 +163,6 @@ func TestAggregateGraphData(t *testing.T) { records := make([]interface{}, 3) for i := range records { record := sampleRecord - query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}` - response := `{"data":{"characters":{"info":{"count":758}}}}` - record.RawRequest = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query))) - record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response))) if i == 1 { record.Tags = []string{} } @@ -175,7 +171,7 @@ func TestAggregateGraphData(t *testing.T) { return records }, expectedAggregate: map[string]GraphRecordAggregate{ - "test-org": { + "test-api": { Types: map[string]*Counter{ "Characters": {Hits: 2, ErrorTotal: 0, Success: 2}, "Info": {Hits: 2, ErrorTotal: 0, Success: 2}, @@ -196,19 +192,16 @@ func TestAggregateGraphData(t *testing.T) { records := make([]interface{}, 3) for i := range records { record := sampleRecord - query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}` - response := `{"data":{"characters":{"info":{"count":758}}}}` if i == 1 { - response = graphErrorResponse + response := graphErrorResponse + record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response))) } - record.RawRequest = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query))) - record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response))) records[i] = record } return records }, expectedAggregate: map[string]GraphRecordAggregate{ - "test-org": { + "test-api": { Types: map[string]*Counter{ "Characters": {Hits: 3, ErrorTotal: 1, Success: 2}, "Info": {Hits: 3, ErrorTotal: 1, Success: 2}, @@ -229,10 +222,6 @@ func TestAggregateGraphData(t *testing.T) { records := make([]interface{}, 5) for i := range records { record := sampleRecord - query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}` - response := `{"data":{"characters":{"info":{"count":758}}}}` - record.RawRequest = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query))) - record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response))) if i == 2 || i == 4 { record.ResponseCode = 500 } @@ -241,7 +230,7 @@ func TestAggregateGraphData(t *testing.T) { return records }, expectedAggregate: map[string]GraphRecordAggregate{ - "test-org": { + "test-api": { Types: map[string]*Counter{ "Characters": {Hits: 5, ErrorTotal: 2, Success: 3}, "Info": {Hits: 5, ErrorTotal: 2, Success: 3}, @@ -325,7 +314,7 @@ func TestAggregateGraphData_Dimension(t *testing.T) { r := require.New(t) aggregated := AggregateGraphData(records, "", 1) r.Len(aggregated, 1) - aggre := aggregated["test-org"] + aggre := aggregated["test-api"] dimensions := aggre.Dimensions() fmt.Println(dimensions) for d, values := range responsesCheck { @@ -337,7 +326,7 @@ func TestAggregateGraphData_Dimension(t *testing.T) { } } if !found { - t.Errorf("item missing from dimensions: NameL %s, Value: %s, Hits:3", d, v) + t.Errorf("item missing from dimensions: Name: %s, Value: %s, Hits:3", d, v) } } } diff --git a/pumps/graph_sql_aggregate.go b/pumps/graph_sql_aggregate.go index cd9ef78f7..5df39ad76 100644 --- a/pumps/graph_sql_aggregate.go +++ b/pumps/graph_sql_aggregate.go @@ -68,7 +68,7 @@ func (s *GraphSQLAggregatePump) Init(conf interface{}) error { } s.db = db if !s.SQLConf.TableSharding { - if err := s.db.Table(analytics.AggregateGraphSQLTable).AutoMigrate(&analytics.SQLAnalyticsRecordAggregate{}); err != nil { + if err := s.db.Table(analytics.AggregateGraphSQLTable).AutoMigrate(&analytics.GraphSQLAnalyticsRecordAggregate{}); err != nil { s.log.WithError(err).Warn("error migrating table") } } @@ -115,7 +115,7 @@ func (s *GraphSQLAggregatePump) WriteData(ctx context.Context, data []interface{ table = analytics.AggregateGraphSQLTable + "_" + recDate s.db = s.db.Table(table) if !s.db.Migrator().HasTable(table) { - if err := s.db.AutoMigrate(&analytics.SQLAnalyticsRecordAggregate{}); err != nil { + if err := s.db.AutoMigrate(&analytics.GraphSQLAnalyticsRecordAggregate{}); err != nil { s.log.WithError(err).Warn("error running auto migration") } } @@ -132,11 +132,11 @@ func (s *GraphSQLAggregatePump) WriteData(ctx context.Context, data []interface{ aggregationTime = 60 } - analyticsPerOrg := analytics.AggregateGraphData(data[startIndex:endIndex], "", aggregationTime) + analyticsPerAPI := analytics.AggregateGraphData(data[startIndex:endIndex], "", aggregationTime) - for orgID := range analyticsPerOrg { - ag := analyticsPerOrg[orgID] - err := s.DoAggregatedWriting(ctx, table, orgID, &ag) + for apiID := range analyticsPerAPI { + ag := analyticsPerAPI[apiID] + err := s.DoAggregatedWriting(ctx, table, ag.OrgID, apiID, &ag) if err != nil { s.log.WithError(err).Error("error writing record") return err @@ -150,14 +150,15 @@ func (s *GraphSQLAggregatePump) WriteData(ctx context.Context, data []interface{ return nil } -func (s *GraphSQLAggregatePump) DoAggregatedWriting(ctx context.Context, table, orgID string, ag *analytics.GraphRecordAggregate) error { - recs := []analytics.SQLAnalyticsRecordAggregate{} +func (s *GraphSQLAggregatePump) DoAggregatedWriting(ctx context.Context, table, orgID, apiID string, ag *analytics.GraphRecordAggregate) error { + var recs []analytics.GraphSQLAnalyticsRecordAggregate dimensions := ag.Dimensions() for _, d := range dimensions { - rec := analytics.SQLAnalyticsRecordAggregate{ - ID: hex.EncodeToString([]byte(fmt.Sprintf("%v", ag.TimeStamp.Unix()) + orgID + d.Name + d.Value)), + rec := analytics.GraphSQLAnalyticsRecordAggregate{ + ID: hex.EncodeToString([]byte(fmt.Sprintf("%v", ag.TimeStamp.Unix()) + apiID + d.Name + d.Value)), OrgID: orgID, + APIID: apiID, TimeStamp: ag.TimeStamp.Unix(), Counter: *d.Counter, Dimension: d.Name,