diff --git a/pumps/sql.go b/pumps/sql.go index 35cf3f25b..7ab4e6418 100644 --- a/pumps/sql.go +++ b/pumps/sql.go @@ -113,7 +113,7 @@ var ( indexes = []struct { baseName string - columns string + column string }{ {"idx_responsecode", "responsecode"}, {"idx_apikey", "apikey"}, @@ -249,6 +249,9 @@ func (c *SQLPump) WriteData(ctx context.Context, data []interface{}) error { if errTable := c.ensureTable(table); errTable != nil { return errTable } + if err := c.ensureIndex(table, false); err != nil { + return err + } } else { i = dataLen // write all records at once for non-sharded case, stop for loop after 1 iteration } @@ -379,14 +382,19 @@ func (c *SQLPump) ensureIndex(tableName string, background bool) error { return errors.New("cannot create indexes as table doesn't exist: " + tableName) } - createIndexFn := func(indexBaseName, columns string) error { + createIndexFn := func(indexBaseName, column string) error { indexName := c.buildIndexName(indexBaseName, tableName) option := "" if c.dbType == "postgres" { option = "CONCURRENTLY" } - sql := fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (%s)", option, indexName, tableName, columns) + columnExist := c.db.Migrator().HasColumn(&analytics.AnalyticsRecord{}, column) + if !columnExist { + return errors.New("cannot create index for non existent column " + column) + } + + sql := fmt.Sprintf("CREATE INDEX %s IF NOT EXISTS %s ON %s (%s)", option, indexName, tableName, column) err := c.db.Exec(sql).Error if err != nil { c.log.Errorf("error creating index %s for table %s : %s", indexName, tableName, err.Error()) @@ -405,9 +413,9 @@ func (c *SQLPump) ensureIndex(tableName string, background bool) error { if err := createIndexFn(baseName, cols); err != nil { c.log.Error(err) } - }(idx.baseName, idx.columns) + }(idx.baseName, idx.column) } else { - if err := createIndexFn(idx.baseName, idx.columns); err != nil { + if err := createIndexFn(idx.baseName, idx.column); err != nil { return err } } @@ -426,15 +434,10 @@ func (c *SQLPump) ensureIndex(tableName string, background bool) error { func (c *SQLPump) ensureTable(tableName string) error { if !c.db.Migrator().HasTable(tableName) { c.db = c.db.Table(tableName) - if err := c.db.Migrator().CreateTable(&analytics.AnalyticsRecord{}); err != nil { c.log.Error("error creating table", err) return err } - - if err := c.ensureIndex(tableName, false); err != nil { - return err - } } return nil } diff --git a/pumps/sql_test.go b/pumps/sql_test.go index 3864abfa8..3abe73f37 100644 --- a/pumps/sql_test.go +++ b/pumps/sql_test.go @@ -364,57 +364,49 @@ func TestDecodeRequestAndDecodeResponseSQL(t *testing.T) { assert.False(t, newPump.GetDecodedResponse()) } +func setupSQLPump(t *testing.T, tableName string, useBackground bool) *SQLPump { + t.Helper() + pmp := &SQLPump{} + pmp.log = log.WithField("prefix", "sql-pump") + cfg := map[string]interface{}{ + "type": "sqlite", + "connection_string": "", + } + + assert.NoError(t, pmp.Init(cfg)) + if useBackground { + pmp.backgroundIndexCreated = make(chan bool, 1) + } + assert.NoError(t, pmp.ensureTable(tableName)) + + return pmp +} func TestEnsureIndexSQL(t *testing.T) { //nolint:govet tcs := []struct { testName string givenTableName string expectedErr error - pmpSetupFn func(tableName string) *SQLPump + pmpSetupFn func(t *testing.T, tableName string) *SQLPump givenRunInBackground bool shouldHaveIndex bool }{ { testName: "index created correctly, not background", - pmpSetupFn: func(tableName string) *SQLPump { - pmp := SQLPump{} - cfg := make(map[string]interface{}) - cfg["type"] = "sqlite" - cfg["connection_string"] = "" - pmp.log = log.WithField("prefix", "sql-pump") - err := pmp.Init(cfg) - assert.Nil(t, err) - - if err := pmp.ensureTable(tableName); err != nil { - return nil - } - - return &pmp + pmpSetupFn: func(t *testing.T, tableName string) *SQLPump { + return setupSQLPump(t, tableName, false) }, - givenTableName: "test", + givenTableName: "analytics_no_background", givenRunInBackground: false, expectedErr: nil, shouldHaveIndex: true, }, { testName: "index created correctly, background", - pmpSetupFn: func(tableName string) *SQLPump { - cfg := make(map[string]interface{}) - pmp := SQLPump{} - cfg["type"] = "sqlite" - cfg["connection_string"] = "" - pmp.log = log.WithField("prefix", "sql-pump") - err := pmp.Init(cfg) - assert.Nil(t, err) - - pmp.backgroundIndexCreated = make(chan bool, 1) - if err := pmp.ensureTable(tableName); err != nil { - return nil - } - - return &pmp + pmpSetupFn: func(t *testing.T, tableName string) *SQLPump { + return setupSQLPump(t, tableName, true) }, - givenTableName: "test", + givenTableName: "analytics_background", givenRunInBackground: true, expectedErr: nil, shouldHaveIndex: true, @@ -423,28 +415,31 @@ func TestEnsureIndexSQL(t *testing.T) { for _, tc := range tcs { t.Run(tc.testName, func(t *testing.T) { - pmp := tc.pmpSetupFn(tc.givenTableName) + pmp := tc.pmpSetupFn(t, tc.givenTableName) defer func() { pmp.db.Migrator().DropTable(tc.givenTableName) }() - assert.NotNil(t, pmp) actualErr := pmp.ensureIndex(tc.givenTableName, tc.givenRunInBackground) + isErrExpected := tc.expectedErr != nil + didErr := actualErr != nil + assert.Equal(t, isErrExpected, didErr) + + if isErrExpected { + assert.Equal(t, tc.expectedErr.Error(), actualErr.Error()) + } if actualErr == nil { if tc.givenRunInBackground { // wait for the background index creation to finish <-pmp.backgroundIndexCreated - } else { - indexToUse := indexes[0] - t.Logf("\n Sent: %v --%v \n", indexToUse.baseName, tc.givenTableName) - indexName := pmp.buildIndexName(indexToUse.baseName, tc.givenTableName) - hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, indexName) - assert.Equal(t, tc.shouldHaveIndex, hasIndex) } - } else { - assert.Equal(t, tc.expectedErr.Error(), actualErr.Error()) + + indexToUse := indexes[0] + indexName := pmp.buildIndexName(indexToUse.baseName, tc.givenTableName) + hasIndex := pmp.db.Table(tc.givenTableName).Migrator().HasIndex(tc.givenTableName, indexName) + assert.Equal(t, tc.shouldHaveIndex, hasIndex) } }) }