From be74e8a46d4d757c7bf83eee9900dc3ec6d35a10 Mon Sep 17 00:00:00 2001 From: w41ter Date: Fri, 26 Apr 2024 16:52:53 +0800 Subject: [PATCH] Fix keyword name in ADD PARTITION --- pkg/ccr/base/spec.go | 28 +++++++-------- pkg/ccr/meta.go | 3 +- pkg/ccr/record/add_partition.go | 3 +- pkg/utils/sql.go | 5 +++ ...d_nema.groovy => test_keyword_name.groovy} | 34 +++++++++++++++++-- 5 files changed, 52 insertions(+), 21 deletions(-) rename regression-test/suites/table-sync/{test_keyword_nema.groovy => test_keyword_name.groovy} (81%) diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index 4bfa4652..152ec30b 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -48,10 +48,6 @@ func (s BackupState) String() string { } } -func formatKeyWordName(name string) string { - return "`" + strings.TrimSpace(name) + "`" -} - func ParseBackupState(state string) BackupState { switch state { case "PENDING": @@ -204,7 +200,7 @@ func (s *Spec) IsDatabaseEnableBinlog() (bool, error) { } var createDBString string - query := fmt.Sprintf("SHOW CREATE DATABASE %s", formatKeyWordName(s.Database)) + query := fmt.Sprintf("SHOW CREATE DATABASE %s", utils.FormatKeywordName(s.Database)) rows, err := db.Query(query) if err != nil { return false, xerror.Wrap(err, xerror.Normal, query) @@ -242,7 +238,7 @@ func (s *Spec) IsTableEnableBinlog() (bool, error) { } var createTableString string - query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", formatKeyWordName(s.Database), formatKeyWordName(s.Table)) + query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", utils.FormatKeywordName(s.Database), utils.FormatKeywordName(s.Table)) rows, err := db.Query(query) if err != nil { return false, xerror.Wrap(err, xerror.Normal, query) @@ -308,7 +304,7 @@ func (s *Spec) dropTable(table string) error { return err } - sql := fmt.Sprintf("DROP TABLE %s.%s", formatKeyWordName(s.Database), formatKeyWordName(table)) + sql := fmt.Sprintf("DROP TABLE %s.%s", utils.FormatKeywordName(s.Database), utils.FormatKeywordName(table)) _, err = db.Exec(sql) if err != nil { return xerror.Wrapf(err, xerror.Normal, "drop table %s.%s failed, sql: %s", s.Database, table, sql) @@ -324,13 +320,13 @@ func (s *Spec) ClearDB() error { return err } - sql := fmt.Sprintf("DROP DATABASE %s", formatKeyWordName(s.Database)) + sql := fmt.Sprintf("DROP DATABASE %s", utils.FormatKeywordName(s.Database)) _, err = db.Exec(sql) if err != nil { return xerror.Wrapf(err, xerror.Normal, "drop database %s failed", s.Database) } - if _, err = db.Exec("CREATE DATABASE " + formatKeyWordName(s.Database)); err != nil { + if _, err = db.Exec("CREATE DATABASE " + utils.FormatKeywordName(s.Database)); err != nil { return xerror.Wrapf(err, xerror.Normal, "create database %s failed", s.Database) } return nil @@ -344,7 +340,7 @@ func (s *Spec) CreateDatabase() error { return nil } - if _, err = db.Exec("CREATE DATABASE IF NOT EXISTS " + formatKeyWordName(s.Database)); err != nil { + if _, err = db.Exec("CREATE DATABASE IF NOT EXISTS " + utils.FormatKeywordName(s.Database)); err != nil { return xerror.Wrapf(err, xerror.Normal, "create database %s failed", s.Database) } return nil @@ -404,7 +400,7 @@ func (s *Spec) CheckTableExists() (bool, error) { return false, err } - sql := fmt.Sprintf("SHOW TABLES FROM %s LIKE '%s'", formatKeyWordName(s.Database), s.Table) + sql := fmt.Sprintf("SHOW TABLES FROM %s LIKE '%s'", utils.FormatKeywordName(s.Database), s.Table) rows, err := db.Query(sql) if err != nil { return false, xerror.Wrapf(err, xerror.Normal, "show tables failed, sql: %s", sql) @@ -444,7 +440,7 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) { // snapshot name format "ccrs_${table}_${timestamp}" // table refs = table snapshotName = fmt.Sprintf("ccrs_%s_%s_%d", s.Database, s.Table, time.Now().Unix()) - tableRefs = formatKeyWordName(tables[0]) + tableRefs = utils.FormatKeywordName(tables[0]) } else { // snapshot name format "ccrs_${db}_${timestamp}" // table refs = tables.join(", ") @@ -464,7 +460,7 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) { return "", err } - backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s ) PROPERTIES (\"type\" = \"full\")", formatKeyWordName(s.Database), snapshotName, tableRefs) + backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s ) PROPERTIES (\"type\" = \"full\")", utils.FormatKeywordName(s.Database), snapshotName, tableRefs) log.Debugf("backup snapshot sql: %s", backupSnapshotSql) _, err = db.Exec(backupSnapshotSql) if err != nil { @@ -492,7 +488,7 @@ func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, error) { return BackupStateUnknown, err } - sql := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName = \"%s\"", formatKeyWordName(s.Database), snapshotName) + sql := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName = \"%s\"", utils.FormatKeywordName(s.Database), snapshotName) log.Debugf("check backup state sql: %s", sql) rows, err := db.Query(sql) if err != nil { @@ -545,7 +541,7 @@ func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string, return RestoreStateUnknown, "", err } - query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label = \"%s\"", formatKeyWordName(s.Database), snapshotName) + query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label = \"%s\"", utils.FormatKeywordName(s.Database), snapshotName) log.Debugf("check restore state sql: %s", query) rows, err := db.Query(query) @@ -639,7 +635,7 @@ func (s *Spec) waitTransactionDone(txnId int64) error { // WHERE // [id=transaction_id] // [label = label_name]; - query := fmt.Sprintf("SHOW TRANSACTION FROM %s WHERE id = %d", formatKeyWordName(s.Database), txnId) + query := fmt.Sprintf("SHOW TRANSACTION FROM %s WHERE id = %d", utils.FormatKeywordName(s.Database), txnId) log.Debugf("wait transaction done sql: %s", query) rows, err := db.Query(query) diff --git a/pkg/ccr/meta.go b/pkg/ccr/meta.go index 0e960dbb..a04dd8ba 100644 --- a/pkg/ccr/meta.go +++ b/pkg/ccr/meta.go @@ -143,6 +143,7 @@ func (m *Meta) UpdateTable(tableName string, tableId int64) (*TableMeta, error) } query := fmt.Sprintf("show proc '/dbs/%d/'", dbId) + log.Infof("UpdateTable Sql: %s", query) rows, err := db.Query(query) if err != nil { return nil, xerror.Wrap(err, xerror.Normal, query) @@ -185,7 +186,7 @@ func (m *Meta) UpdateTable(tableName string, tableId int64) (*TableMeta, error) } // not found - return nil, xerror.Errorf(xerror.Meta, "tableId %v not found table", tableId) + return nil, xerror.Errorf(xerror.Meta, "tableName %s tableId %v not found table", tableName, tableId) } func (m *Meta) GetTable(tableId int64) (*TableMeta, error) { diff --git a/pkg/ccr/record/add_partition.go b/pkg/ccr/record/add_partition.go index 62a21551..cb762d0a 100644 --- a/pkg/ccr/record/add_partition.go +++ b/pkg/ccr/record/add_partition.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "github.com/selectdb/ccr_syncer/pkg/utils" "github.com/selectdb/ccr_syncer/pkg/xerror" log "github.com/sirupsen/logrus" @@ -53,7 +54,7 @@ func (addPartition *AddPartition) getDistributionColumns() []string { func (addPartition *AddPartition) GetSql(destTableName string) string { // addPartitionSql = "ALTER TABLE " + sql - addPartitionSql := fmt.Sprintf("ALTER TABLE %s %s", destTableName, addPartition.Sql) + addPartitionSql := fmt.Sprintf("ALTER TABLE %s %s", utils.FormatKeywordName(destTableName), addPartition.Sql) // remove last ';' and add BUCKETS num addPartitionSql = strings.TrimRight(addPartitionSql, ";") // check contains BUCKETS num, ignore case diff --git a/pkg/utils/sql.go b/pkg/utils/sql.go index b612413a..caa1ba25 100644 --- a/pkg/utils/sql.go +++ b/pkg/utils/sql.go @@ -3,6 +3,7 @@ package utils import ( "database/sql" "strconv" + "strings" "github.com/selectdb/ccr_syncer/pkg/xerror" ) @@ -83,3 +84,7 @@ func (r *RowParser) GetString(columnName string) (string, error) { return string(*resBytes), nil } + +func FormatKeywordName(name string) string { + return "`" + strings.TrimSpace(name) + "`" +} diff --git a/regression-test/suites/table-sync/test_keyword_nema.groovy b/regression-test/suites/table-sync/test_keyword_name.groovy similarity index 81% rename from regression-test/suites/table-sync/test_keyword_nema.groovy rename to regression-test/suites/table-sync/test_keyword_name.groovy index b9d64f45..193598c5 100644 --- a/regression-test/suites/table-sync/test_keyword_nema.groovy +++ b/regression-test/suites/table-sync/test_keyword_name.groovy @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -suite("test_keyword_nema") { +suite("test_keyword_name") { def tableName = "roles" def syncerAddress = "127.0.0.1:9190" @@ -90,6 +90,8 @@ suite("test_keyword_nema") { return res.size() == 0 } + sql "DROP TABLE IF EXISTS `${tableName}` FORCE" + target_sql "DROP TABLE IF EXISTS `${tableName}` FORCE" sql """ CREATE TABLE `${tableName}` ( role_id INT, @@ -98,6 +100,10 @@ suite("test_keyword_nema") { register_time DATE ) UNIQUE KEY(role_id) + PARTITION BY RANGE (role_id) + ( + PARTITION p1 VALUES LESS THAN ("10") + ) DISTRIBUTED BY HASH(role_id) BUCKETS 1 PROPERTIES ( "replication_allocation" = "tag.location.default: 1", @@ -130,8 +136,6 @@ suite("test_keyword_nema") { assertTrue(checkRestoreFinishTimesOf("${tableName}", 30)) - - logger.info("=== Test 1: Check keyword name table ===") // def checkShowTimesOf = { sqlString, myClosure, times, func = "sql" -> Boolean assertTrue(checkShowTimesOf(""" @@ -139,4 +143,28 @@ suite("test_keyword_nema") { """, exist, 30, "target")) + logger.info("=== Test 2: Add new partition ===") + sql """ + ALTER TABLE `${tableName}` ADD PARTITION p2 + VALUES LESS THAN ("20") + """ + + sql """ + INSERT INTO `${tableName}` VALUES + (11, 'who am I', NULL, NULL), + (12, 'mage', 'alliance', '2018-12-03 16:11:28'); + """ + + def checkNewPartition = { inputRes -> Boolean + for (List row : inputRes) { + if ((row[1] as String).contains("PARTITION p2")) { + return true + } + } + return false + } + assertTrue(checkShowTimesOf(""" + SHOW CREATE TABLE `TEST_${context.dbName}`.`${tableName}` + """, + checkNewPartition, 30, "target")) } \ No newline at end of file