Skip to content

Commit

Permalink
refactoring ensureIndex in sql pump
Browse files Browse the repository at this point in the history
  • Loading branch information
sredny buitrago authored and sredny buitrago committed Nov 27, 2024
1 parent d84594c commit 50602c3
Showing 1 changed file with 32 additions and 27 deletions.
59 changes: 32 additions & 27 deletions pumps/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,51 +375,56 @@ func (c *SQLPump) buildIndexName(indexBaseName, tableName string) string {
return fmt.Sprintf("%s_%s", tableName, indexBaseName)
}

func (c *SQLPump) createIndex(indexBaseName, tableName, column string, wg *sync.WaitGroup) error {
if wg != nil {
defer wg.Done()
}
indexName := c.buildIndexName(indexBaseName, tableName)
option := ""
if c.dbType == "postgres" {
option = "CONCURRENTLY"
}

columnExist := c.db.Migrator().HasColumn(&analytics.AnalyticsRecord{}, column)
if !columnExist {
return errors.New("cannot create index for non existent column " + column)
}

sql := fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (%s)", option, indexName, tableName, column)
err := c.db.Exec(sql).Error
if err != nil {
c.log.Errorf("error creating index %s for table %s : %s", indexName, tableName, err.Error())
return err
}

c.log.Infof("Index %s created for table %s", indexName, tableName)
return nil
}

// ensureIndex check that all indexes for the analytics SQL table are in place
func (c *SQLPump) ensureIndex(tableName string, background bool) error {
if !c.db.Migrator().HasTable(tableName) {
return errors.New("cannot create indexes as table doesn't exist: " + tableName)
}

// waitgroup to facilitate testing and track when all indexes are created
var wg sync.WaitGroup
wg.Add(len(indexes))
createIndexFn := func(indexBaseName, column string) error {
defer wg.Done()
indexName := c.buildIndexName(indexBaseName, tableName)
option := ""
if c.dbType == "postgres" {
option = "CONCURRENTLY"
}

columnExist := c.db.Migrator().HasColumn(&analytics.AnalyticsRecord{}, column)
if !columnExist {
return errors.New("cannot create index for non existent column " + column)
}

sql := fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (%s)", option, indexName, tableName, column)
err := c.db.Exec(sql).Error
if err != nil {
c.log.Errorf("error creating index %s for table %s : %s", indexName, tableName, err.Error())
return err
}

c.log.Infof("Index %s created for table %s", indexName, tableName)
return nil
var wg *sync.WaitGroup
if background {
wg = &sync.WaitGroup{}
wg.Add(len(indexes))
}

for _, idx := range indexes {
indexName := tableName + idx.baseName

if !c.db.Migrator().HasIndex(tableName, indexName) {
if background {
go func(baseName, cols string) {
if err := createIndexFn(baseName, cols); err != nil {
if err := c.createIndex(baseName, tableName, cols, wg); err != nil {
c.log.Error(err)
}
}(idx.baseName, idx.column)
} else {
if err := createIndexFn(idx.baseName, idx.column); err != nil {
if err := c.createIndex(idx.baseName, tableName, idx.column, wg); err != nil {
return err
}
}
Expand Down

0 comments on commit 50602c3

Please sign in to comment.