Skip to content

Commit

Permalink
TT-13421 create indexes on sharded sql pumps (#861)
Browse files Browse the repository at this point in the history
* started to add indexes to shards

* create indexes when SQL pump is sharded

* added test for ensure index in SQL pump

* update sql aggregate to handle multiple indexes on sharded analytics

* update SQL aggregate test

* isolate index build logic

* some refactor

* fixing test for indexes in SQL pump

* gofmt sql_test.go

* check drop table err

* gofumpt

* ensuring table for SQL

* gofumpt

* refactoring ensureIndex in sql pump

* improve logging

* refactor logic while looping over indexes

* gofumpt

* only use the waitgroup for background

---------

Co-authored-by: sredny buitrago <[email protected]>
  • Loading branch information
sredxny and sredny buitrago authored Dec 5, 2024
1 parent fbcb614 commit 544ccb3
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 16 deletions.
12 changes: 6 additions & 6 deletions analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ type AnalyticsRecord struct {
Month time.Month `json:"month" sql:"-"`
Year int `json:"year" sql:"-"`
Hour int `json:"hour" sql:"-"`
ResponseCode int `json:"response_code" gorm:"column:responsecode;index"`
APIKey string `json:"api_key" gorm:"column:apikey;index"`
TimeStamp time.Time `json:"timestamp" gorm:"column:timestamp;index"`
ResponseCode int `json:"response_code" gorm:"column:responsecode"`
APIKey string `json:"api_key" gorm:"column:apikey"`
TimeStamp time.Time `json:"timestamp" gorm:"column:timestamp"`
APIVersion string `json:"api_version" gorm:"column:apiversion"`
APIName string `json:"api_name" sql:"-"`
APIID string `json:"api_id" gorm:"column:apiid;index"`
OrgID string `json:"org_id" gorm:"column:orgid;index"`
OauthID string `json:"oauth_id" gorm:"column:oauthid;index"`
APIID string `json:"api_id" gorm:"column:apiid"`
OrgID string `json:"org_id" gorm:"column:orgid"`
OauthID string `json:"oauth_id" gorm:"column:oauthid"`
RequestTime int64 `json:"request_time" gorm:"column:requesttime"`
RawRequest string `json:"raw_request" gorm:"column:rawrequest"`
RawResponse string `json:"raw_response" gorm:"column:rawresponse"`
Expand Down
115 changes: 113 additions & 2 deletions pumps/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"encoding/hex"
"errors"
"fmt"
"sync"

"github.com/sirupsen/logrus"

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -45,6 +48,9 @@ type SQLPump struct {
db *gorm.DB
dbType string
dialect gorm.Dialector

// this channel is used to signal that the background index creation has finished - this is used for testing
backgroundIndexCreated chan bool
}

// @PumpConf SQL
Expand Down Expand Up @@ -107,6 +113,18 @@ var (
SQLPrefix = "SQL-pump"
SQLDefaultENV = PUMPS_ENV_PREFIX + "_SQL" + PUMPS_ENV_META_PREFIX
SQLDefaultQueryBatchSize = 1000

indexes = []struct {
baseName string
column string
}{
{"idx_responsecode", "responsecode"},
{"idx_apikey", "apikey"},
{"idx_timestamp", "timestamp"},
{"idx_apiid", "apiid"},
{"idx_orgid", "orgid"},
{"idx_oauthid", "oauthid"},
}
)

func (c *SQLPump) New() Pump {
Expand Down Expand Up @@ -231,8 +249,8 @@ func (c *SQLPump) WriteData(ctx context.Context, data []interface{}) error {

table := analytics.SQLTable + "_" + recDate
c.db = c.db.Table(table)
if !c.db.Migrator().HasTable(table) {
c.db.AutoMigrate(&analytics.AnalyticsRecord{})
if errTable := c.ensureTable(table); errTable != nil {
return errTable
}
} else {
i = dataLen // write all records at once for non-sharded case, stop for loop after 1 iteration
Expand Down Expand Up @@ -354,3 +372,96 @@ func (c *SQLPump) WriteUptimeData(data []interface{}) {

c.log.Debug("Purged ", len(data), " records...")
}

func (c *SQLPump) buildIndexName(indexBaseName, tableName string) string {
return fmt.Sprintf("%s_%s", tableName, indexBaseName)
}

func (c *SQLPump) createIndex(indexBaseName, tableName, column string) error {
indexName := c.buildIndexName(indexBaseName, tableName)
option := ""
if c.dbType == "postgres" {
option = "CONCURRENTLY"
}

columnExist := c.db.Migrator().HasColumn(&analytics.AnalyticsRecord{}, column)
if !columnExist {
return errors.New("cannot create index for non existent column " + column)
}

query := fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (%s)", option, indexName, tableName, column)
err := c.db.Exec(query).Error
if err != nil {
c.log.WithFields(logrus.Fields{
"index": indexName,
"table": tableName,
}).WithError(err).Error("Error creating index")
return err
}

c.log.Infof("Index %s created for table %s", indexName, tableName)
c.log.WithFields(logrus.Fields{
"index": indexName,
"table": tableName,
}).Info("Index created")
return nil
}

// ensureIndex check that all indexes for the analytics SQL table are in place
func (c *SQLPump) ensureIndex(tableName string, background bool) error {
if !c.db.Migrator().HasTable(tableName) {
return errors.New("cannot create indexes as table doesn't exist: " + tableName)
}

// waitgroup to facilitate testing and track when all indexes are created
var wg sync.WaitGroup
if background {
wg.Add(len(indexes))
}

for _, idx := range indexes {
indexName := tableName + idx.baseName

if c.db.Migrator().HasIndex(tableName, indexName) {
c.log.WithFields(logrus.Fields{
"index": indexName,
"table": tableName,
}).Info("Index already exists")
continue
}

if background {
go func(baseName, cols string) {
defer wg.Done()
if err := c.createIndex(baseName, tableName, cols); err != nil {
c.log.Error(err)
}
}(idx.baseName, idx.column)
} else {
if err := c.createIndex(idx.baseName, tableName, idx.column); err != nil {
return err
}
}
}

if background {
wg.Wait()
c.backgroundIndexCreated <- true
}
return nil
}

// ensureTable creates the table if it doesn't exist
func (c *SQLPump) ensureTable(tableName string) error {
if !c.db.Migrator().HasTable(tableName) {
c.db = c.db.Table(tableName)
if err := c.db.Migrator().CreateTable(&analytics.AnalyticsRecord{}); err != nil {
c.log.Error("error creating table", err)
return err
}
if err := c.ensureIndex(tableName, false); err != nil {
return err
}
}
return nil
}
9 changes: 4 additions & 5 deletions pumps/sql_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.log.Info("omit_index_creation set to true, omitting index creation..")
return nil
}

if !c.db.Migrator().HasIndex(tableName, newAggregatedIndexName) {
indexName := fmt.Sprintf("%s_%s", tableName, newAggregatedIndexName)
if !c.db.Migrator().HasIndex(tableName, indexName) {
createIndexFn := func(c *SQLAggregatePump) error {
option := ""
if c.dbType == "postgres" {
option = "CONCURRENTLY"
}

err := c.db.Table(tableName).Exec(fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (dimension, timestamp, org_id, dimension_value)", option, newAggregatedIndexName, tableName)).Error
err := c.db.Table(tableName).Exec(fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (dimension, timestamp, org_id, dimension_value)", option, indexName, tableName)).Error
if err != nil {
c.log.Errorf("error creating index for table %s : %s", tableName, err.Error())
return err
Expand All @@ -178,7 +178,7 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.backgroundIndexCreated <- true
}

c.log.Info("Index ", newAggregatedIndexName, " for table ", tableName, " created successfully")
c.log.Info("Index ", indexName, " for table ", tableName, " created successfully")

return nil
}
Expand All @@ -198,7 +198,6 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.log.Info("Creating index for table ", tableName, "...")
return createIndexFn(c)
}
c.log.Info(newAggregatedIndexName, " already exists.")

return nil
}
Expand Down
47 changes: 44 additions & 3 deletions pumps/sql_aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pumps
import (
"context"
"errors"
"fmt"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -31,7 +32,8 @@ func TestSQLAggregateInit(t *testing.T) {
assert.Equal(t, "sqlite", pmp.db.Dialector.Name())
assert.Equal(t, true, pmp.db.Migrator().HasTable(analytics.AggregateSQLTable))

assert.Equal(t, true, pmp.db.Migrator().HasIndex(analytics.AggregateSQLTable, newAggregatedIndexName))
indexName := fmt.Sprintf("%s_%s", analytics.AggregateSQLTable, newAggregatedIndexName)
assert.Equal(t, true, pmp.db.Migrator().HasIndex(analytics.AggregateSQLTable, indexName))

// Checking with invalid type
cfg["type"] = "invalid"
Expand Down Expand Up @@ -337,7 +339,7 @@ func TestDecodeRequestAndDecodeResponseSQLAggregate(t *testing.T) {
assert.False(t, newPump.GetDecodedResponse())
}

func TestEnsureIndex(t *testing.T) {
func TestEnsureIndexSQLAggregate(t *testing.T) {
//nolint:govet
tcs := []struct {
testName string
Expand Down Expand Up @@ -419,6 +421,44 @@ func TestEnsureIndex(t *testing.T) {
expectedErr: nil,
shouldHaveIndex: true,
},
{
testName: "index created correctly, background on sharded pump",
pmpSetupFn: func(tableName string) *SQLAggregatePump {
pmp := &SQLAggregatePump{}
cfg := &SQLAggregatePumpConf{}
cfg.Type = "sqlite"
cfg.TableSharding = true
cfg.ConnectionString = ""
pmp.SQLConf = cfg

pmp.log = log.WithField("prefix", "sql-aggregate-pump")
dialect, errDialect := Dialect(&pmp.SQLConf.SQLConf)
if errDialect != nil {
return nil
}
db, err := gorm.Open(dialect, &gorm.Config{
AutoEmbedd: true,
UseJSONTags: true,
Logger: logger.Default.LogMode(logger.Info),
})
if err != nil {
return nil
}
pmp.db = db

pmp.backgroundIndexCreated = make(chan bool, 1)

if err := pmp.ensureTable(tableName); err != nil {
return nil
}

return pmp
},
givenTableName: "shard1",
givenRunInBackground: true,
expectedErr: nil,
shouldHaveIndex: true,
},
{
testName: "index created on non existing table, not background",
pmpSetupFn: func(tableName string) *SQLAggregatePump {
Expand Down Expand Up @@ -499,7 +539,8 @@ func TestEnsureIndex(t *testing.T) {
// wait for the background index creation to finish
<-pmp.backgroundIndexCreated
} else {
hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, newAggregatedIndexName)
indexName := fmt.Sprintf("%s_%s", tc.givenTableName, newAggregatedIndexName)
hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, indexName)
assert.Equal(t, tc.shouldHaveIndex, hasIndex)
}
} else {
Expand Down
110 changes: 110 additions & 0 deletions pumps/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,3 +383,113 @@ func TestDecodeRequestAndDecodeResponseSQL(t *testing.T) {
assert.False(t, newPump.GetDecodedRequest())
assert.False(t, newPump.GetDecodedResponse())
}

func setupSQLPump(t *testing.T, tableName string, useBackground bool) *SQLPump {
t.Helper()
pmp := &SQLPump{}
pmp.log = log.WithField("prefix", "sql-pump")
cfg := map[string]interface{}{
"type": "sqlite",
"connection_string": "",
}

assert.NoError(t, pmp.Init(cfg))
if useBackground {
pmp.backgroundIndexCreated = make(chan bool, 1)
}
assert.NoError(t, pmp.ensureTable(tableName))

return pmp
}

func TestEnsureIndexSQL(t *testing.T) {
//nolint:govet
tcs := []struct {
testName string
givenTableName string
expectedErr error
pmpSetupFn func(t *testing.T, tableName string) *SQLPump
givenRunInBackground bool
shouldHaveIndex bool
}{
{
testName: "index created correctly, not background",
pmpSetupFn: func(t *testing.T, tableName string) *SQLPump {
return setupSQLPump(t, tableName, false)
},
givenTableName: "analytics_no_background",
givenRunInBackground: false,
expectedErr: nil,
shouldHaveIndex: true,
},
{
testName: "index created correctly, background",
pmpSetupFn: func(t *testing.T, tableName string) *SQLPump {
return setupSQLPump(t, tableName, true)
},
givenTableName: "analytics_background",
givenRunInBackground: true,
expectedErr: nil,
shouldHaveIndex: true,
},
}

for _, tc := range tcs {
t.Run(tc.testName, func(t *testing.T) {
pmp := tc.pmpSetupFn(t, tc.givenTableName)
defer func() {
err := pmp.db.Migrator().DropTable(tc.givenTableName)
if err != nil {
t.Errorf("Failed to drop table: %v", err)
}
}()
assert.NotNil(t, pmp)

actualErr := pmp.ensureIndex(tc.givenTableName, tc.givenRunInBackground)
isErrExpected := tc.expectedErr != nil
didErr := actualErr != nil
assert.Equal(t, isErrExpected, didErr)

if isErrExpected {
assert.Equal(t, tc.expectedErr.Error(), actualErr.Error())
}

if actualErr == nil {
if tc.givenRunInBackground {
// wait for the background index creation to finish
<-pmp.backgroundIndexCreated
}

indexToUse := indexes[0]
indexName := pmp.buildIndexName(indexToUse.baseName, tc.givenTableName)
hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, indexName)
assert.Equal(t, tc.shouldHaveIndex, hasIndex)
}
})
}
}

func TestBuildIndexName(t *testing.T) {
tests := []struct {
indexBaseName string
tableName string
expected string
}{
{"idx_responsecode", "users", "users_idx_responsecode"},
{"idx_apikey", "transactions", "transactions_idx_apikey"},
{"idx_timestamp", "logs", "logs_idx_timestamp"},
{"idx_apiid", "api_calls", "api_calls_idx_apiid"},
{"idx_orgid", "organizations", "organizations_idx_orgid"},
}

c := &SQLPump{} // Create an instance of SQLPump.

for _, tt := range tests {
t.Run(tt.indexBaseName+"_"+tt.tableName, func(t *testing.T) {
result := c.buildIndexName(tt.indexBaseName, tt.tableName)
if result != tt.expected {
t.Errorf("buildIndexName(%s, %s) = %s; want %s", tt.indexBaseName, tt.tableName, result, tt.expected)
}
})
}
}

0 comments on commit 544ccb3

Please sign in to comment.