From 50602c327fe3e25a7f6e514706edf3c760e7b8ac Mon Sep 17 00:00:00 2001 From: sredny buitrago Date: Wed, 27 Nov 2024 12:15:37 -0300 Subject: [PATCH] refactoring ensureIndex in sql pump --- pumps/sql.go | 59 ++++++++++++++++++++++++++++------------------------ 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/pumps/sql.go b/pumps/sql.go index 337cfd96d..101cae267 100644 --- a/pumps/sql.go +++ b/pumps/sql.go @@ -375,6 +375,32 @@ func (c *SQLPump) buildIndexName(indexBaseName, tableName string) string { return fmt.Sprintf("%s_%s", tableName, indexBaseName) } +func (c *SQLPump) createIndex(indexBaseName, tableName, column string, wg *sync.WaitGroup) error { + if wg != nil { + defer wg.Done() + } + indexName := c.buildIndexName(indexBaseName, tableName) + option := "" + if c.dbType == "postgres" { + option = "CONCURRENTLY" + } + + columnExist := c.db.Migrator().HasColumn(&analytics.AnalyticsRecord{}, column) + if !columnExist { + return errors.New("cannot create index for non existent column " + column) + } + + sql := fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (%s)", option, indexName, tableName, column) + err := c.db.Exec(sql).Error + if err != nil { + c.log.Errorf("error creating index %s for table %s : %s", indexName, tableName, err.Error()) + return err + } + + c.log.Infof("Index %s created for table %s", indexName, tableName) + return nil +} + // ensureIndex check that all indexes for the analytics SQL table are in place func (c *SQLPump) ensureIndex(tableName string, background bool) error { if !c.db.Migrator().HasTable(tableName) { @@ -382,44 +408,23 @@ func (c *SQLPump) ensureIndex(tableName string, background bool) error { } // waitgroup to facilitate testing and track when all indexes are created - var wg sync.WaitGroup - wg.Add(len(indexes)) - createIndexFn := func(indexBaseName, column string) error { - defer wg.Done() - indexName := c.buildIndexName(indexBaseName, tableName) - option := "" - if c.dbType == "postgres" { - option = "CONCURRENTLY" - } - - columnExist := c.db.Migrator().HasColumn(&analytics.AnalyticsRecord{}, column) - if !columnExist { - return errors.New("cannot create index for non existent column " + column) - } - - sql := fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (%s)", option, indexName, tableName, column) - err := c.db.Exec(sql).Error - if err != nil { - c.log.Errorf("error creating index %s for table %s : %s", indexName, tableName, err.Error()) - return err - } - - c.log.Infof("Index %s created for table %s", indexName, tableName) - return nil + var wg *sync.WaitGroup + if background { + wg = &sync.WaitGroup{} + wg.Add(len(indexes)) } for _, idx := range indexes { indexName := tableName + idx.baseName - if !c.db.Migrator().HasIndex(tableName, indexName) { if background { go func(baseName, cols string) { - if err := createIndexFn(baseName, cols); err != nil { + if err := c.createIndex(baseName, tableName, cols, wg); err != nil { c.log.Error(err) } }(idx.baseName, idx.column) } else { - if err := createIndexFn(idx.baseName, idx.column); err != nil { + if err := c.createIndex(idx.baseName, tableName, idx.column, wg); err != nil { return err } }