Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TT-13421 create indexes on sharded sql pumps #861

Merged
merged 20 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ type AnalyticsRecord struct {
Month time.Month `json:"month" sql:"-"`
Year int `json:"year" sql:"-"`
Hour int `json:"hour" sql:"-"`
ResponseCode int `json:"response_code" gorm:"column:responsecode;index"`
APIKey string `json:"api_key" gorm:"column:apikey;index"`
TimeStamp time.Time `json:"timestamp" gorm:"column:timestamp;index"`
ResponseCode int `json:"response_code" gorm:"column:responsecode"`
APIKey string `json:"api_key" gorm:"column:apikey"`
TimeStamp time.Time `json:"timestamp" gorm:"column:timestamp"`
APIVersion string `json:"api_version" gorm:"column:apiversion"`
APIName string `json:"api_name" sql:"-"`
APIID string `json:"api_id" gorm:"column:apiid;index"`
OrgID string `json:"org_id" gorm:"column:orgid;index"`
OauthID string `json:"oauth_id" gorm:"column:oauthid;index"`
APIID string `json:"api_id" gorm:"column:apiid"`
OrgID string `json:"org_id" gorm:"column:orgid"`
OauthID string `json:"oauth_id" gorm:"column:oauthid"`
RequestTime int64 `json:"request_time" gorm:"column:requesttime"`
RawRequest string `json:"raw_request" gorm:"column:rawrequest"`
RawResponse string `json:"raw_response" gorm:"column:rawresponse"`
Expand Down
99 changes: 97 additions & 2 deletions pumps/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"sync"

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -45,6 +46,9 @@ type SQLPump struct {
db *gorm.DB
dbType string
dialect gorm.Dialector

// this channel is used to signal that the background index creation has finished - this is used for testing
backgroundIndexCreated chan bool
}

// @PumpConf SQL
Expand Down Expand Up @@ -107,6 +111,18 @@ var (
SQLPrefix = "SQL-pump"
SQLDefaultENV = PUMPS_ENV_PREFIX + "_SQL" + PUMPS_ENV_META_PREFIX
SQLDefaultQueryBatchSize = 1000

indexes = []struct {
baseName string
column string
}{
{"idx_responsecode", "responsecode"},
{"idx_apikey", "apikey"},
{"idx_timestamp", "timestamp"},
{"idx_apiid", "apiid"},
{"idx_orgid", "orgid"},
{"idx_oauthid", "oauthid"},
}
)

func (c *SQLPump) New() Pump {
Expand Down Expand Up @@ -231,8 +247,8 @@ func (c *SQLPump) WriteData(ctx context.Context, data []interface{}) error {

table := analytics.SQLTable + "_" + recDate
c.db = c.db.Table(table)
if !c.db.Migrator().HasTable(table) {
c.db.AutoMigrate(&analytics.AnalyticsRecord{})
if errTable := c.ensureTable(table); errTable != nil {
return errTable
}
} else {
i = dataLen // write all records at once for non-sharded case, stop for loop after 1 iteration
Expand Down Expand Up @@ -354,3 +370,82 @@ func (c *SQLPump) WriteUptimeData(data []interface{}) {

c.log.Debug("Purged ", len(data), " records...")
}

func (c *SQLPump) buildIndexName(indexBaseName, tableName string) string {
return fmt.Sprintf("%s_%s", tableName, indexBaseName)
}

// ensureIndex check that all indexes for the analytics SQL table are in place
func (c *SQLPump) ensureIndex(tableName string, background bool) error {
if !c.db.Migrator().HasTable(tableName) {
return errors.New("cannot create indexes as table doesn't exist: " + tableName)
}

// waitgroup to facilitate testing and track when all indexes are created
var wg sync.WaitGroup
sredxny marked this conversation as resolved.
Show resolved Hide resolved
wg.Add(len(indexes))
createIndexFn := func(indexBaseName, column string) error {
sredxny marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()
indexName := c.buildIndexName(indexBaseName, tableName)
option := ""
if c.dbType == "postgres" {
option = "CONCURRENTLY"
}

columnExist := c.db.Migrator().HasColumn(&analytics.AnalyticsRecord{}, column)
if !columnExist {
return errors.New("cannot create index for non existent column " + column)
}

sql := fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (%s)", option, indexName, tableName, column)
err := c.db.Exec(sql).Error
if err != nil {
c.log.Errorf("error creating index %s for table %s : %s", indexName, tableName, err.Error())
sredxny marked this conversation as resolved.
Show resolved Hide resolved
return err
}

c.log.Infof("Index %s created for table %s", indexName, tableName)
sredxny marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

for _, idx := range indexes {
indexName := tableName + idx.baseName

if !c.db.Migrator().HasIndex(tableName, indexName) {
sredxny marked this conversation as resolved.
Show resolved Hide resolved
if background {
go func(baseName, cols string) {
if err := createIndexFn(baseName, cols); err != nil {
c.log.Error(err)
}
}(idx.baseName, idx.column)
} else {
if err := createIndexFn(idx.baseName, idx.column); err != nil {
return err
}
}
} else {
c.log.Infof("Index %s already exists for table %s", indexName, tableName)
}
}

if background {
wg.Wait()
c.backgroundIndexCreated <- true
}
return nil
}

// ensureTable creates the table if it doesn't exist
func (c *SQLPump) ensureTable(tableName string) error {
if !c.db.Migrator().HasTable(tableName) {
c.db = c.db.Table(tableName)
if err := c.db.Migrator().CreateTable(&analytics.AnalyticsRecord{}); err != nil {
c.log.Error("error creating table", err)
return err
}
if err := c.ensureIndex(tableName, false); err != nil {
return err
}
}
return nil
}
10 changes: 5 additions & 5 deletions pumps/sql_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.log.Info("omit_index_creation set to true, omitting index creation..")
return nil
}

if !c.db.Migrator().HasIndex(tableName, newAggregatedIndexName) {
indexName := fmt.Sprintf("%s_%s", tableName, newAggregatedIndexName)
if !c.db.Migrator().HasIndex(tableName, indexName) {
createIndexFn := func(c *SQLAggregatePump) error {
option := ""
if c.dbType == "postgres" {
option = "CONCURRENTLY"
}

err := c.db.Table(tableName).Exec(fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (dimension, timestamp, org_id, dimension_value)", option, newAggregatedIndexName, tableName)).Error
err := c.db.Table(tableName).Exec(fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (dimension, timestamp, org_id, dimension_value)", option, indexName, tableName)).Error
if err != nil {
c.log.Errorf("error creating index for table %s : %s", tableName, err.Error())
return err
Expand All @@ -178,7 +178,7 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.backgroundIndexCreated <- true
}

c.log.Info("Index ", newAggregatedIndexName, " for table ", tableName, " created successfully")
c.log.Info("Index ", indexName, " for table ", tableName, " created successfully")

return nil
}
Expand All @@ -198,7 +198,7 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.log.Info("Creating index for table ", tableName, "...")
return createIndexFn(c)
}
c.log.Info(newAggregatedIndexName, " already exists.")
c.log.Info(indexName, " already exists.")
sredxny marked this conversation as resolved.
Show resolved Hide resolved

return nil
}
Expand Down
47 changes: 44 additions & 3 deletions pumps/sql_aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pumps
import (
"context"
"errors"
"fmt"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -31,7 +32,8 @@ func TestSQLAggregateInit(t *testing.T) {
assert.Equal(t, "sqlite", pmp.db.Dialector.Name())
assert.Equal(t, true, pmp.db.Migrator().HasTable(analytics.AggregateSQLTable))

assert.Equal(t, true, pmp.db.Migrator().HasIndex(analytics.AggregateSQLTable, newAggregatedIndexName))
indexName := fmt.Sprintf("%s_%s", analytics.AggregateSQLTable, newAggregatedIndexName)
assert.Equal(t, true, pmp.db.Migrator().HasIndex(analytics.AggregateSQLTable, indexName))

// Checking with invalid type
cfg["type"] = "invalid"
Expand Down Expand Up @@ -337,7 +339,7 @@ func TestDecodeRequestAndDecodeResponseSQLAggregate(t *testing.T) {
assert.False(t, newPump.GetDecodedResponse())
}

func TestEnsureIndex(t *testing.T) {
func TestEnsureIndexSQLAggregate(t *testing.T) {
//nolint:govet
tcs := []struct {
testName string
Expand Down Expand Up @@ -419,6 +421,44 @@ func TestEnsureIndex(t *testing.T) {
expectedErr: nil,
shouldHaveIndex: true,
},
{
testName: "index created correctly, background on sharded pump",
pmpSetupFn: func(tableName string) *SQLAggregatePump {
pmp := &SQLAggregatePump{}
cfg := &SQLAggregatePumpConf{}
cfg.Type = "sqlite"
cfg.TableSharding = true
cfg.ConnectionString = ""
pmp.SQLConf = cfg

pmp.log = log.WithField("prefix", "sql-aggregate-pump")
dialect, errDialect := Dialect(&pmp.SQLConf.SQLConf)
if errDialect != nil {
return nil
}
db, err := gorm.Open(dialect, &gorm.Config{
AutoEmbedd: true,
UseJSONTags: true,
Logger: logger.Default.LogMode(logger.Info),
})
if err != nil {
return nil
}
pmp.db = db

pmp.backgroundIndexCreated = make(chan bool, 1)

if err := pmp.ensureTable(tableName); err != nil {
return nil
}

return pmp
},
givenTableName: "shard1",
givenRunInBackground: true,
expectedErr: nil,
shouldHaveIndex: true,
},
{
testName: "index created on non existing table, not background",
pmpSetupFn: func(tableName string) *SQLAggregatePump {
Expand Down Expand Up @@ -499,7 +539,8 @@ func TestEnsureIndex(t *testing.T) {
// wait for the background index creation to finish
<-pmp.backgroundIndexCreated
} else {
hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, newAggregatedIndexName)
indexName := fmt.Sprintf("%s_%s", tc.givenTableName, newAggregatedIndexName)
hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, indexName)
assert.Equal(t, tc.shouldHaveIndex, hasIndex)
}
} else {
Expand Down
110 changes: 110 additions & 0 deletions pumps/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,3 +363,113 @@ func TestDecodeRequestAndDecodeResponseSQL(t *testing.T) {
assert.False(t, newPump.GetDecodedRequest())
assert.False(t, newPump.GetDecodedResponse())
}

func setupSQLPump(t *testing.T, tableName string, useBackground bool) *SQLPump {
t.Helper()
pmp := &SQLPump{}
pmp.log = log.WithField("prefix", "sql-pump")
cfg := map[string]interface{}{
"type": "sqlite",
"connection_string": "",
}

assert.NoError(t, pmp.Init(cfg))
if useBackground {
pmp.backgroundIndexCreated = make(chan bool, 1)
}
assert.NoError(t, pmp.ensureTable(tableName))

return pmp
}

func TestEnsureIndexSQL(t *testing.T) {
//nolint:govet
tcs := []struct {
testName string
givenTableName string
expectedErr error
pmpSetupFn func(t *testing.T, tableName string) *SQLPump
givenRunInBackground bool
shouldHaveIndex bool
}{
{
testName: "index created correctly, not background",
pmpSetupFn: func(t *testing.T, tableName string) *SQLPump {
return setupSQLPump(t, tableName, false)
},
givenTableName: "analytics_no_background",
givenRunInBackground: false,
expectedErr: nil,
shouldHaveIndex: true,
},
{
testName: "index created correctly, background",
pmpSetupFn: func(t *testing.T, tableName string) *SQLPump {
return setupSQLPump(t, tableName, true)
},
givenTableName: "analytics_background",
givenRunInBackground: true,
expectedErr: nil,
shouldHaveIndex: true,
},
}

for _, tc := range tcs {
t.Run(tc.testName, func(t *testing.T) {
pmp := tc.pmpSetupFn(t, tc.givenTableName)
defer func() {
err := pmp.db.Migrator().DropTable(tc.givenTableName)
if err != nil {
t.Errorf("Failed to drop table: %v", err)
}
}()
assert.NotNil(t, pmp)

actualErr := pmp.ensureIndex(tc.givenTableName, tc.givenRunInBackground)
isErrExpected := tc.expectedErr != nil
didErr := actualErr != nil
assert.Equal(t, isErrExpected, didErr)

if isErrExpected {
assert.Equal(t, tc.expectedErr.Error(), actualErr.Error())
}

if actualErr == nil {
if tc.givenRunInBackground {
// wait for the background index creation to finish
<-pmp.backgroundIndexCreated
}

indexToUse := indexes[0]
indexName := pmp.buildIndexName(indexToUse.baseName, tc.givenTableName)
hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, indexName)
assert.Equal(t, tc.shouldHaveIndex, hasIndex)
}
})
}
}

func TestBuildIndexName(t *testing.T) {
tests := []struct {
indexBaseName string
tableName string
expected string
}{
{"idx_responsecode", "users", "users_idx_responsecode"},
{"idx_apikey", "transactions", "transactions_idx_apikey"},
{"idx_timestamp", "logs", "logs_idx_timestamp"},
{"idx_apiid", "api_calls", "api_calls_idx_apiid"},
{"idx_orgid", "organizations", "organizations_idx_orgid"},
}

c := &SQLPump{} // Create an instance of SQLPump.

for _, tt := range tests {
t.Run(tt.indexBaseName+"_"+tt.tableName, func(t *testing.T) {
result := c.buildIndexName(tt.indexBaseName, tt.tableName)
if result != tt.expected {
t.Errorf("buildIndexName(%s, %s) = %s; want %s", tt.indexBaseName, tt.tableName, result, tt.expected)
}
})
}
}
Loading