Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TT-13421 create indexes on sharded sql pumps #861

Merged
merged 20 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions analytics/analytics.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ type AnalyticsRecord struct {
Month time.Month `json:"month" sql:"-"`
Year int `json:"year" sql:"-"`
Hour int `json:"hour" sql:"-"`
ResponseCode int `json:"response_code" gorm:"column:responsecode;index"`
APIKey string `json:"api_key" gorm:"column:apikey;index"`
TimeStamp time.Time `json:"timestamp" gorm:"column:timestamp;index"`
ResponseCode int `json:"response_code" gorm:"column:responsecode"`
APIKey string `json:"api_key" gorm:"column:apikey"`
TimeStamp time.Time `json:"timestamp" gorm:"column:timestamp"`
APIVersion string `json:"api_version" gorm:"column:apiversion"`
APIName string `json:"api_name" sql:"-"`
APIID string `json:"api_id" gorm:"column:apiid;index"`
OrgID string `json:"org_id" gorm:"column:orgid;index"`
OauthID string `json:"oauth_id" gorm:"column:oauthid;index"`
APIID string `json:"api_id" gorm:"column:apiid"`
OrgID string `json:"org_id" gorm:"column:orgid"`
OauthID string `json:"oauth_id" gorm:"column:oauthid"`
RequestTime int64 `json:"request_time" gorm:"column:requesttime"`
RawRequest string `json:"raw_request" gorm:"column:rawrequest"`
RawResponse string `json:"raw_response" gorm:"column:rawresponse"`
Expand Down
118 changes: 116 additions & 2 deletions pumps/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"encoding/hex"
"errors"
"fmt"
"sync"

"github.com/sirupsen/logrus"

"github.com/TykTechnologies/tyk-pump/analytics"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -45,6 +48,9 @@ type SQLPump struct {
db *gorm.DB
dbType string
dialect gorm.Dialector

// this channel is used to signal that the background index creation has finished - this is used for testing
backgroundIndexCreated chan bool
}

// @PumpConf SQL
Expand Down Expand Up @@ -107,6 +113,18 @@ var (
SQLPrefix = "SQL-pump"
SQLDefaultENV = PUMPS_ENV_PREFIX + "_SQL" + PUMPS_ENV_META_PREFIX
SQLDefaultQueryBatchSize = 1000

indexes = []struct {
baseName string
column string
}{
{"idx_responsecode", "responsecode"},
{"idx_apikey", "apikey"},
{"idx_timestamp", "timestamp"},
{"idx_apiid", "apiid"},
{"idx_orgid", "orgid"},
{"idx_oauthid", "oauthid"},
}
)

func (c *SQLPump) New() Pump {
Expand Down Expand Up @@ -231,8 +249,8 @@ func (c *SQLPump) WriteData(ctx context.Context, data []interface{}) error {

table := analytics.SQLTable + "_" + recDate
c.db = c.db.Table(table)
if !c.db.Migrator().HasTable(table) {
c.db.AutoMigrate(&analytics.AnalyticsRecord{})
if errTable := c.ensureTable(table); errTable != nil {
return errTable
}
} else {
i = dataLen // write all records at once for non-sharded case, stop for loop after 1 iteration
Expand Down Expand Up @@ -354,3 +372,99 @@ func (c *SQLPump) WriteUptimeData(data []interface{}) {

c.log.Debug("Purged ", len(data), " records...")
}

func (c *SQLPump) buildIndexName(indexBaseName, tableName string) string {
return fmt.Sprintf("%s_%s", tableName, indexBaseName)
}

func (c *SQLPump) createIndex(indexBaseName, tableName, column string, wg *sync.WaitGroup) error {
if wg != nil {
defer wg.Done()
}
indexName := c.buildIndexName(indexBaseName, tableName)
option := ""
if c.dbType == "postgres" {
option = "CONCURRENTLY"
}

columnExist := c.db.Migrator().HasColumn(&analytics.AnalyticsRecord{}, column)
if !columnExist {
return errors.New("cannot create index for non existent column " + column)
}

sql := fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (%s)", option, indexName, tableName, column)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better naming for this variable could be query, what do you think?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to this, let's bring in practise to be careful when forming SQL queries. I know the values in this query are totally under our control (consts), but still I feel like it's better to use an sql builder. https://gorm.io/docs/sql_builder.html - this might help

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.... @jeffy-mathew in such case we would need to add some logic like:

query := "CREATE INDEX IF NOT EXISTS ? ON ? (?)"
if option != "" {
    query = fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS ? ON ? (?)", option) // Add option dynamically
}

As the original query contains a param "option" that can be "concunrrently" or empty, and db.Raw method doesnt allows it and will mark it as bad query

err := c.db.Exec(sql).Error
if err != nil {
c.log.WithFields(logrus.Fields{
"index": indexName,
"table": tableName,
}).WithError(err).Error("Error creating index")
return err
}

c.log.Infof("Index %s created for table %s", indexName, tableName)
c.log.WithFields(logrus.Fields{
"index": indexName,
"table": tableName,
}).Info("Index created")
return nil
}

// ensureIndex check that all indexes for the analytics SQL table are in place
func (c *SQLPump) ensureIndex(tableName string, background bool) error {
if !c.db.Migrator().HasTable(tableName) {
return errors.New("cannot create indexes as table doesn't exist: " + tableName)
}

// waitgroup to facilitate testing and track when all indexes are created
var wg *sync.WaitGroup
if background {
wg = &sync.WaitGroup{}
wg.Add(len(indexes))
}

for _, idx := range indexes {
indexName := tableName + idx.baseName

if c.db.Migrator().HasIndex(tableName, indexName) {
c.log.WithFields(logrus.Fields{
"index": indexName,
"table": tableName,
}).Info("Index already exists")
continue
}

if background {
go func(baseName, cols string) {
sredxny marked this conversation as resolved.
Show resolved Hide resolved
if err := c.createIndex(baseName, tableName, cols, wg); err != nil {
c.log.Error(err)
}
}(idx.baseName, idx.column)
} else {
if err := c.createIndex(idx.baseName, tableName, idx.column, wg); err != nil {
return err
}
}
}

if background {
wg.Wait()
c.backgroundIndexCreated <- true
}
return nil
}

// ensureTable creates the table if it doesn't exist
func (c *SQLPump) ensureTable(tableName string) error {
if !c.db.Migrator().HasTable(tableName) {
c.db = c.db.Table(tableName)
if err := c.db.Migrator().CreateTable(&analytics.AnalyticsRecord{}); err != nil {
c.log.Error("error creating table", err)
return err
}
if err := c.ensureIndex(tableName, false); err != nil {
return err
}
}
return nil
}
10 changes: 5 additions & 5 deletions pumps/sql_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,15 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.log.Info("omit_index_creation set to true, omitting index creation..")
return nil
}

if !c.db.Migrator().HasIndex(tableName, newAggregatedIndexName) {
indexName := fmt.Sprintf("%s_%s", tableName, newAggregatedIndexName)
if !c.db.Migrator().HasIndex(tableName, indexName) {
createIndexFn := func(c *SQLAggregatePump) error {
option := ""
if c.dbType == "postgres" {
option = "CONCURRENTLY"
}

err := c.db.Table(tableName).Exec(fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (dimension, timestamp, org_id, dimension_value)", option, newAggregatedIndexName, tableName)).Error
err := c.db.Table(tableName).Exec(fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (dimension, timestamp, org_id, dimension_value)", option, indexName, tableName)).Error
if err != nil {
c.log.Errorf("error creating index for table %s : %s", tableName, err.Error())
return err
Expand All @@ -178,7 +178,7 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.backgroundIndexCreated <- true
}

c.log.Info("Index ", newAggregatedIndexName, " for table ", tableName, " created successfully")
c.log.Info("Index ", indexName, " for table ", tableName, " created successfully")

return nil
}
Expand All @@ -198,7 +198,7 @@ func (c *SQLAggregatePump) ensureIndex(tableName string, background bool) error
c.log.Info("Creating index for table ", tableName, "...")
return createIndexFn(c)
}
c.log.Info(newAggregatedIndexName, " already exists.")
c.log.Info(indexName, " already exists.")
sredxny marked this conversation as resolved.
Show resolved Hide resolved

return nil
}
Expand Down
47 changes: 44 additions & 3 deletions pumps/sql_aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pumps
import (
"context"
"errors"
"fmt"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -31,7 +32,8 @@ func TestSQLAggregateInit(t *testing.T) {
assert.Equal(t, "sqlite", pmp.db.Dialector.Name())
assert.Equal(t, true, pmp.db.Migrator().HasTable(analytics.AggregateSQLTable))

assert.Equal(t, true, pmp.db.Migrator().HasIndex(analytics.AggregateSQLTable, newAggregatedIndexName))
indexName := fmt.Sprintf("%s_%s", analytics.AggregateSQLTable, newAggregatedIndexName)
assert.Equal(t, true, pmp.db.Migrator().HasIndex(analytics.AggregateSQLTable, indexName))

// Checking with invalid type
cfg["type"] = "invalid"
Expand Down Expand Up @@ -337,7 +339,7 @@ func TestDecodeRequestAndDecodeResponseSQLAggregate(t *testing.T) {
assert.False(t, newPump.GetDecodedResponse())
}

func TestEnsureIndex(t *testing.T) {
func TestEnsureIndexSQLAggregate(t *testing.T) {
//nolint:govet
tcs := []struct {
testName string
Expand Down Expand Up @@ -419,6 +421,44 @@ func TestEnsureIndex(t *testing.T) {
expectedErr: nil,
shouldHaveIndex: true,
},
{
testName: "index created correctly, background on sharded pump",
pmpSetupFn: func(tableName string) *SQLAggregatePump {
pmp := &SQLAggregatePump{}
cfg := &SQLAggregatePumpConf{}
cfg.Type = "sqlite"
cfg.TableSharding = true
cfg.ConnectionString = ""
pmp.SQLConf = cfg

pmp.log = log.WithField("prefix", "sql-aggregate-pump")
dialect, errDialect := Dialect(&pmp.SQLConf.SQLConf)
if errDialect != nil {
return nil
}
db, err := gorm.Open(dialect, &gorm.Config{
AutoEmbedd: true,
UseJSONTags: true,
Logger: logger.Default.LogMode(logger.Info),
})
if err != nil {
return nil
}
pmp.db = db

pmp.backgroundIndexCreated = make(chan bool, 1)

if err := pmp.ensureTable(tableName); err != nil {
return nil
}

return pmp
},
givenTableName: "shard1",
givenRunInBackground: true,
expectedErr: nil,
shouldHaveIndex: true,
},
{
testName: "index created on non existing table, not background",
pmpSetupFn: func(tableName string) *SQLAggregatePump {
Expand Down Expand Up @@ -499,7 +539,8 @@ func TestEnsureIndex(t *testing.T) {
// wait for the background index creation to finish
<-pmp.backgroundIndexCreated
} else {
hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, newAggregatedIndexName)
indexName := fmt.Sprintf("%s_%s", tc.givenTableName, newAggregatedIndexName)
hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, indexName)
assert.Equal(t, tc.shouldHaveIndex, hasIndex)
}
} else {
Expand Down
Loading
Loading