diff --git a/analytics/analytics.go b/analytics/analytics.go index 79c1036e1..3017b290a 100644 --- a/analytics/analytics.go +++ b/analytics/analytics.go @@ -53,14 +53,14 @@ type AnalyticsRecord struct { Month time.Month `json:"month" sql:"-"` Year int `json:"year" sql:"-"` Hour int `json:"hour" sql:"-"` - ResponseCode int `json:"response_code" gorm:"column:responsecode;index"` - APIKey string `json:"api_key" gorm:"column:apikey;index"` - TimeStamp time.Time `json:"timestamp" gorm:"column:timestamp;index"` + ResponseCode int `json:"response_code" gorm:"column:responsecode"` + APIKey string `json:"api_key" gorm:"column:apikey"` + TimeStamp time.Time `json:"timestamp" gorm:"column:timestamp"` APIVersion string `json:"api_version" gorm:"column:apiversion"` APIName string `json:"api_name" sql:"-"` - APIID string `json:"api_id" gorm:"column:apiid;index"` - OrgID string `json:"org_id" gorm:"column:orgid;index"` - OauthID string `json:"oauth_id" gorm:"column:oauthid;index"` + APIID string `json:"api_id" gorm:"column:apiid"` + OrgID string `json:"org_id" gorm:"column:orgid"` + OauthID string `json:"oauth_id" gorm:"column:oauthid"` RequestTime int64 `json:"request_time" gorm:"column:requesttime"` RawRequest string `json:"raw_request" gorm:"column:rawrequest"` RawResponse string `json:"raw_response" gorm:"column:rawresponse"` diff --git a/pumps/sql.go b/pumps/sql.go index db8025dcc..13956370d 100644 --- a/pumps/sql.go +++ b/pumps/sql.go @@ -5,6 +5,9 @@ import ( "encoding/hex" "errors" "fmt" + "sync" + + "github.com/sirupsen/logrus" "github.com/TykTechnologies/tyk-pump/analytics" "github.com/mitchellh/mapstructure" @@ -45,6 +48,9 @@ type SQLPump struct { db *gorm.DB dbType string dialect gorm.Dialector + + // this channel is used to signal that the background index creation has finished - this is used for testing + backgroundIndexCreated chan bool } // @PumpConf SQL @@ -107,6 +113,18 @@ var ( SQLPrefix = "SQL-pump" SQLDefaultENV = PUMPS_ENV_PREFIX + "_SQL" + PUMPS_ENV_META_PREFIX SQLDefaultQueryBatchSize = 1000 + + indexes = []struct { + baseName string + column string + }{ + {"idx_responsecode", "responsecode"}, + {"idx_apikey", "apikey"}, + {"idx_timestamp", "timestamp"}, + {"idx_apiid", "apiid"}, + {"idx_orgid", "orgid"}, + {"idx_oauthid", "oauthid"}, + } ) func (c *SQLPump) New() Pump { @@ -231,8 +249,8 @@ func (c *SQLPump) WriteData(ctx context.Context, data []interface{}) error { table := analytics.SQLTable + "_" + recDate c.db = c.db.Table(table) - if !c.db.Migrator().HasTable(table) { - c.db.AutoMigrate(&analytics.AnalyticsRecord{}) + if errTable := c.ensureTable(table); errTable != nil { + return errTable } } else { i = dataLen // write all records at once for non-sharded case, stop for loop after 1 iteration @@ -354,3 +372,96 @@ func (c *SQLPump) WriteUptimeData(data []interface{}) { c.log.Debug("Purged ", len(data), " records...") } + +func (c *SQLPump) buildIndexName(indexBaseName, tableName string) string { + return fmt.Sprintf("%s_%s", tableName, indexBaseName) +} + +func (c *SQLPump) createIndex(indexBaseName, tableName, column string) error { + indexName := c.buildIndexName(indexBaseName, tableName) + option := "" + if c.dbType == "postgres" { + option = "CONCURRENTLY" + } + + columnExist := c.db.Migrator().HasColumn(&analytics.AnalyticsRecord{}, column) + if !columnExist { + return errors.New("cannot create index for non existent column " + column) + } + + query := fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (%s)", option, indexName, tableName, column) + err := c.db.Exec(query).Error + if err != nil { + c.log.WithFields(logrus.Fields{ + "index": indexName, + "table": tableName, + }).WithError(err).Error("Error creating index") + return err + } + + c.log.Infof("Index %s created for table %s", indexName, tableName) + c.log.WithFields(logrus.Fields{ + "index": indexName, + "table": tableName, + }).Info("Index created") + return nil +} + +// ensureIndex check that all indexes for the analytics SQL table are in place +func (c *SQLPump) ensureIndex(tableName string, background bool) error { + if !c.db.Migrator().HasTable(tableName) { + return errors.New("cannot create indexes as table doesn't exist: " + tableName) + } + + // waitgroup to facilitate testing and track when all indexes are created + var wg sync.WaitGroup + if background { + wg.Add(len(indexes)) + } + + for _, idx := range indexes { + indexName := tableName + idx.baseName + + if c.db.Migrator().HasIndex(tableName, indexName) { + c.log.WithFields(logrus.Fields{ + "index": indexName, + "table": tableName, + }).Info("Index already exists") + continue + } + + if background { + go func(baseName, cols string) { + defer wg.Done() + if err := c.createIndex(baseName, tableName, cols); err != nil { + c.log.Error(err) + } + }(idx.baseName, idx.column) + } else { + if err := c.createIndex(idx.baseName, tableName, idx.column); err != nil { + return err + } + } + } + + if background { + wg.Wait() + c.backgroundIndexCreated <- true + } + return nil +} + +// ensureTable creates the table if it doesn't exist +func (c *SQLPump) ensureTable(tableName string) error { + if !c.db.Migrator().HasTable(tableName) { + c.db = c.db.Table(tableName) + if err := c.db.Migrator().CreateTable(&analytics.AnalyticsRecord{}); err != nil { + c.log.Error("error creating table", err) + return err + } + if err := c.ensureIndex(tableName, false); err != nil { + return err + } + } + return nil +} diff --git a/pumps/sql_aggregate.go b/pumps/sql_aggregate.go index f1e33bb7c..72cab64b6 100644 --- a/pumps/sql_aggregate.go +++ b/pumps/sql_aggregate.go @@ -160,15 +160,15 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error c.log.Info("omit_index_creation set to true, omitting index creation..") return nil } - - if !c.db.Migrator().HasIndex(tableName, newAggregatedIndexName) { + indexName := fmt.Sprintf("%s_%s", tableName, newAggregatedIndexName) + if !c.db.Migrator().HasIndex(tableName, indexName) { createIndexFn := func(c *SQLAggregatePump) error { option := "" if c.dbType == "postgres" { option = "CONCURRENTLY" } - err := c.db.Table(tableName).Exec(fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (dimension, timestamp, org_id, dimension_value)", option, newAggregatedIndexName, tableName)).Error + err := c.db.Table(tableName).Exec(fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (dimension, timestamp, org_id, dimension_value)", option, indexName, tableName)).Error if err != nil { c.log.Errorf("error creating index for table %s : %s", tableName, err.Error()) return err @@ -178,7 +178,7 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error c.backgroundIndexCreated <- true } - c.log.Info("Index ", newAggregatedIndexName, " for table ", tableName, " created successfully") + c.log.Info("Index ", indexName, " for table ", tableName, " created successfully") return nil } @@ -198,7 +198,6 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error c.log.Info("Creating index for table ", tableName, "...") return createIndexFn(c) } - c.log.Info(newAggregatedIndexName, " already exists.") return nil } diff --git a/pumps/sql_aggregate_test.go b/pumps/sql_aggregate_test.go index 48dc1b2aa..e99c34e4b 100644 --- a/pumps/sql_aggregate_test.go +++ b/pumps/sql_aggregate_test.go @@ -3,6 +3,7 @@ package pumps import ( "context" "errors" + "fmt" "net/http" "testing" "time" @@ -31,7 +32,8 @@ func TestSQLAggregateInit(t *testing.T) { assert.Equal(t, "sqlite", pmp.db.Dialector.Name()) assert.Equal(t, true, pmp.db.Migrator().HasTable(analytics.AggregateSQLTable)) - assert.Equal(t, true, pmp.db.Migrator().HasIndex(analytics.AggregateSQLTable, newAggregatedIndexName)) + indexName := fmt.Sprintf("%s_%s", analytics.AggregateSQLTable, newAggregatedIndexName) + assert.Equal(t, true, pmp.db.Migrator().HasIndex(analytics.AggregateSQLTable, indexName)) // Checking with invalid type cfg["type"] = "invalid" @@ -337,7 +339,7 @@ func TestDecodeRequestAndDecodeResponseSQLAggregate(t *testing.T) { assert.False(t, newPump.GetDecodedResponse()) } -func TestEnsureIndex(t *testing.T) { +func TestEnsureIndexSQLAggregate(t *testing.T) { //nolint:govet tcs := []struct { testName string @@ -419,6 +421,44 @@ func TestEnsureIndex(t *testing.T) { expectedErr: nil, shouldHaveIndex: true, }, + { + testName: "index created correctly, background on sharded pump", + pmpSetupFn: func(tableName string) *SQLAggregatePump { + pmp := &SQLAggregatePump{} + cfg := &SQLAggregatePumpConf{} + cfg.Type = "sqlite" + cfg.TableSharding = true + cfg.ConnectionString = "" + pmp.SQLConf = cfg + + pmp.log = log.WithField("prefix", "sql-aggregate-pump") + dialect, errDialect := Dialect(&pmp.SQLConf.SQLConf) + if errDialect != nil { + return nil + } + db, err := gorm.Open(dialect, &gorm.Config{ + AutoEmbedd: true, + UseJSONTags: true, + Logger: logger.Default.LogMode(logger.Info), + }) + if err != nil { + return nil + } + pmp.db = db + + pmp.backgroundIndexCreated = make(chan bool, 1) + + if err := pmp.ensureTable(tableName); err != nil { + return nil + } + + return pmp + }, + givenTableName: "shard1", + givenRunInBackground: true, + expectedErr: nil, + shouldHaveIndex: true, + }, { testName: "index created on non existing table, not background", pmpSetupFn: func(tableName string) *SQLAggregatePump { @@ -499,7 +539,8 @@ func TestEnsureIndex(t *testing.T) { // wait for the background index creation to finish <-pmp.backgroundIndexCreated } else { - hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, newAggregatedIndexName) + indexName := fmt.Sprintf("%s_%s", tc.givenTableName, newAggregatedIndexName) + hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, indexName) assert.Equal(t, tc.shouldHaveIndex, hasIndex) } } else { diff --git a/pumps/sql_test.go b/pumps/sql_test.go index 33e07e80d..7108dba69 100644 --- a/pumps/sql_test.go +++ b/pumps/sql_test.go @@ -383,3 +383,113 @@ func TestDecodeRequestAndDecodeResponseSQL(t *testing.T) { assert.False(t, newPump.GetDecodedRequest()) assert.False(t, newPump.GetDecodedResponse()) } + +func setupSQLPump(t *testing.T, tableName string, useBackground bool) *SQLPump { + t.Helper() + pmp := &SQLPump{} + pmp.log = log.WithField("prefix", "sql-pump") + cfg := map[string]interface{}{ + "type": "sqlite", + "connection_string": "", + } + + assert.NoError(t, pmp.Init(cfg)) + if useBackground { + pmp.backgroundIndexCreated = make(chan bool, 1) + } + assert.NoError(t, pmp.ensureTable(tableName)) + + return pmp +} + +func TestEnsureIndexSQL(t *testing.T) { + //nolint:govet + tcs := []struct { + testName string + givenTableName string + expectedErr error + pmpSetupFn func(t *testing.T, tableName string) *SQLPump + givenRunInBackground bool + shouldHaveIndex bool + }{ + { + testName: "index created correctly, not background", + pmpSetupFn: func(t *testing.T, tableName string) *SQLPump { + return setupSQLPump(t, tableName, false) + }, + givenTableName: "analytics_no_background", + givenRunInBackground: false, + expectedErr: nil, + shouldHaveIndex: true, + }, + { + testName: "index created correctly, background", + pmpSetupFn: func(t *testing.T, tableName string) *SQLPump { + return setupSQLPump(t, tableName, true) + }, + givenTableName: "analytics_background", + givenRunInBackground: true, + expectedErr: nil, + shouldHaveIndex: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.testName, func(t *testing.T) { + pmp := tc.pmpSetupFn(t, tc.givenTableName) + defer func() { + err := pmp.db.Migrator().DropTable(tc.givenTableName) + if err != nil { + t.Errorf("Failed to drop table: %v", err) + } + }() + assert.NotNil(t, pmp) + + actualErr := pmp.ensureIndex(tc.givenTableName, tc.givenRunInBackground) + isErrExpected := tc.expectedErr != nil + didErr := actualErr != nil + assert.Equal(t, isErrExpected, didErr) + + if isErrExpected { + assert.Equal(t, tc.expectedErr.Error(), actualErr.Error()) + } + + if actualErr == nil { + if tc.givenRunInBackground { + // wait for the background index creation to finish + <-pmp.backgroundIndexCreated + } + + indexToUse := indexes[0] + indexName := pmp.buildIndexName(indexToUse.baseName, tc.givenTableName) + hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, indexName) + assert.Equal(t, tc.shouldHaveIndex, hasIndex) + } + }) + } +} + +func TestBuildIndexName(t *testing.T) { + tests := []struct { + indexBaseName string + tableName string + expected string + }{ + {"idx_responsecode", "users", "users_idx_responsecode"}, + {"idx_apikey", "transactions", "transactions_idx_apikey"}, + {"idx_timestamp", "logs", "logs_idx_timestamp"}, + {"idx_apiid", "api_calls", "api_calls_idx_apiid"}, + {"idx_orgid", "organizations", "organizations_idx_orgid"}, + } + + c := &SQLPump{} // Create an instance of SQLPump. + + for _, tt := range tests { + t.Run(tt.indexBaseName+"_"+tt.tableName, func(t *testing.T) { + result := c.buildIndexName(tt.indexBaseName, tt.tableName) + if result != tt.expected { + t.Errorf("buildIndexName(%s, %s) = %s; want %s", tt.indexBaseName, tt.tableName, result, tt.expected) + } + }) + } +}