Skip to content

Commit

Permalink
Fix keyword name in ADD PARTITION
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Apr 26, 2024
1 parent 0de343d commit be74e8a
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 21 deletions.
28 changes: 12 additions & 16 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ func (s BackupState) String() string {
}
}

func formatKeyWordName(name string) string {
return "`" + strings.TrimSpace(name) + "`"
}

func ParseBackupState(state string) BackupState {
switch state {
case "PENDING":
Expand Down Expand Up @@ -204,7 +200,7 @@ func (s *Spec) IsDatabaseEnableBinlog() (bool, error) {
}

var createDBString string
query := fmt.Sprintf("SHOW CREATE DATABASE %s", formatKeyWordName(s.Database))
query := fmt.Sprintf("SHOW CREATE DATABASE %s", utils.FormatKeywordName(s.Database))
rows, err := db.Query(query)
if err != nil {
return false, xerror.Wrap(err, xerror.Normal, query)
Expand Down Expand Up @@ -242,7 +238,7 @@ func (s *Spec) IsTableEnableBinlog() (bool, error) {
}

var createTableString string
query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", formatKeyWordName(s.Database), formatKeyWordName(s.Table))
query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", utils.FormatKeywordName(s.Database), utils.FormatKeywordName(s.Table))
rows, err := db.Query(query)
if err != nil {
return false, xerror.Wrap(err, xerror.Normal, query)
Expand Down Expand Up @@ -308,7 +304,7 @@ func (s *Spec) dropTable(table string) error {
return err
}

sql := fmt.Sprintf("DROP TABLE %s.%s", formatKeyWordName(s.Database), formatKeyWordName(table))
sql := fmt.Sprintf("DROP TABLE %s.%s", utils.FormatKeywordName(s.Database), utils.FormatKeywordName(table))
_, err = db.Exec(sql)
if err != nil {
return xerror.Wrapf(err, xerror.Normal, "drop table %s.%s failed, sql: %s", s.Database, table, sql)
Expand All @@ -324,13 +320,13 @@ func (s *Spec) ClearDB() error {
return err
}

sql := fmt.Sprintf("DROP DATABASE %s", formatKeyWordName(s.Database))
sql := fmt.Sprintf("DROP DATABASE %s", utils.FormatKeywordName(s.Database))
_, err = db.Exec(sql)
if err != nil {
return xerror.Wrapf(err, xerror.Normal, "drop database %s failed", s.Database)
}

if _, err = db.Exec("CREATE DATABASE " + formatKeyWordName(s.Database)); err != nil {
if _, err = db.Exec("CREATE DATABASE " + utils.FormatKeywordName(s.Database)); err != nil {
return xerror.Wrapf(err, xerror.Normal, "create database %s failed", s.Database)
}
return nil
Expand All @@ -344,7 +340,7 @@ func (s *Spec) CreateDatabase() error {
return nil
}

if _, err = db.Exec("CREATE DATABASE IF NOT EXISTS " + formatKeyWordName(s.Database)); err != nil {
if _, err = db.Exec("CREATE DATABASE IF NOT EXISTS " + utils.FormatKeywordName(s.Database)); err != nil {
return xerror.Wrapf(err, xerror.Normal, "create database %s failed", s.Database)
}
return nil
Expand Down Expand Up @@ -404,7 +400,7 @@ func (s *Spec) CheckTableExists() (bool, error) {
return false, err
}

sql := fmt.Sprintf("SHOW TABLES FROM %s LIKE '%s'", formatKeyWordName(s.Database), s.Table)
sql := fmt.Sprintf("SHOW TABLES FROM %s LIKE '%s'", utils.FormatKeywordName(s.Database), s.Table)
rows, err := db.Query(sql)
if err != nil {
return false, xerror.Wrapf(err, xerror.Normal, "show tables failed, sql: %s", sql)
Expand Down Expand Up @@ -444,7 +440,7 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) {
// snapshot name format "ccrs_${table}_${timestamp}"
// table refs = table
snapshotName = fmt.Sprintf("ccrs_%s_%s_%d", s.Database, s.Table, time.Now().Unix())
tableRefs = formatKeyWordName(tables[0])
tableRefs = utils.FormatKeywordName(tables[0])
} else {
// snapshot name format "ccrs_${db}_${timestamp}"
// table refs = tables.join(", ")
Expand All @@ -464,7 +460,7 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) {
return "", err
}

backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s ) PROPERTIES (\"type\" = \"full\")", formatKeyWordName(s.Database), snapshotName, tableRefs)
backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s ) PROPERTIES (\"type\" = \"full\")", utils.FormatKeywordName(s.Database), snapshotName, tableRefs)
log.Debugf("backup snapshot sql: %s", backupSnapshotSql)
_, err = db.Exec(backupSnapshotSql)
if err != nil {
Expand Down Expand Up @@ -492,7 +488,7 @@ func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, error) {
return BackupStateUnknown, err
}

sql := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName = \"%s\"", formatKeyWordName(s.Database), snapshotName)
sql := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName = \"%s\"", utils.FormatKeywordName(s.Database), snapshotName)
log.Debugf("check backup state sql: %s", sql)
rows, err := db.Query(sql)
if err != nil {
Expand Down Expand Up @@ -545,7 +541,7 @@ func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string,
return RestoreStateUnknown, "", err
}

query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label = \"%s\"", formatKeyWordName(s.Database), snapshotName)
query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label = \"%s\"", utils.FormatKeywordName(s.Database), snapshotName)

log.Debugf("check restore state sql: %s", query)
rows, err := db.Query(query)
Expand Down Expand Up @@ -639,7 +635,7 @@ func (s *Spec) waitTransactionDone(txnId int64) error {
// WHERE
// [id=transaction_id]
// [label = label_name];
query := fmt.Sprintf("SHOW TRANSACTION FROM %s WHERE id = %d", formatKeyWordName(s.Database), txnId)
query := fmt.Sprintf("SHOW TRANSACTION FROM %s WHERE id = %d", utils.FormatKeywordName(s.Database), txnId)

log.Debugf("wait transaction done sql: %s", query)
rows, err := db.Query(query)
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccr/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (m *Meta) UpdateTable(tableName string, tableId int64) (*TableMeta, error)
}

query := fmt.Sprintf("show proc '/dbs/%d/'", dbId)
log.Infof("UpdateTable Sql: %s", query)
rows, err := db.Query(query)
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, query)
Expand Down Expand Up @@ -185,7 +186,7 @@ func (m *Meta) UpdateTable(tableName string, tableId int64) (*TableMeta, error)
}

// not found
return nil, xerror.Errorf(xerror.Meta, "tableId %v not found table", tableId)
return nil, xerror.Errorf(xerror.Meta, "tableName %s tableId %v not found table", tableName, tableId)
}

func (m *Meta) GetTable(tableId int64) (*TableMeta, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccr/record/add_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

"github.com/selectdb/ccr_syncer/pkg/utils"
"github.com/selectdb/ccr_syncer/pkg/xerror"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -53,7 +54,7 @@ func (addPartition *AddPartition) getDistributionColumns() []string {

func (addPartition *AddPartition) GetSql(destTableName string) string {
// addPartitionSql = "ALTER TABLE " + sql
addPartitionSql := fmt.Sprintf("ALTER TABLE %s %s", destTableName, addPartition.Sql)
addPartitionSql := fmt.Sprintf("ALTER TABLE %s %s", utils.FormatKeywordName(destTableName), addPartition.Sql)
// remove last ';' and add BUCKETS num
addPartitionSql = strings.TrimRight(addPartitionSql, ";")
// check contains BUCKETS num, ignore case
Expand Down
5 changes: 5 additions & 0 deletions pkg/utils/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package utils
import (
"database/sql"
"strconv"
"strings"

"github.com/selectdb/ccr_syncer/pkg/xerror"
)
Expand Down Expand Up @@ -83,3 +84,7 @@ func (r *RowParser) GetString(columnName string) (string, error) {

return string(*resBytes), nil
}

func FormatKeywordName(name string) string {
return "`" + strings.TrimSpace(name) + "`"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

suite("test_keyword_nema") {
suite("test_keyword_name") {

def tableName = "roles"
def syncerAddress = "127.0.0.1:9190"
Expand Down Expand Up @@ -90,6 +90,8 @@ suite("test_keyword_nema") {
return res.size() == 0
}

sql "DROP TABLE IF EXISTS `${tableName}` FORCE"
target_sql "DROP TABLE IF EXISTS `${tableName}` FORCE"
sql """
CREATE TABLE `${tableName}` (
role_id INT,
Expand All @@ -98,6 +100,10 @@ suite("test_keyword_nema") {
register_time DATE
)
UNIQUE KEY(role_id)
PARTITION BY RANGE (role_id)
(
PARTITION p1 VALUES LESS THAN ("10")
)
DISTRIBUTED BY HASH(role_id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
Expand Down Expand Up @@ -130,13 +136,35 @@ suite("test_keyword_nema") {

assertTrue(checkRestoreFinishTimesOf("${tableName}", 30))



logger.info("=== Test 1: Check keyword name table ===")
// def checkShowTimesOf = { sqlString, myClosure, times, func = "sql" -> Boolean
assertTrue(checkShowTimesOf("""
SHOW CREATE TABLE `TEST_${context.dbName}`.`${tableName}`
""",
exist, 30, "target"))

logger.info("=== Test 2: Add new partition ===")
sql """
ALTER TABLE `${tableName}` ADD PARTITION p2
VALUES LESS THAN ("20")
"""

sql """
INSERT INTO `${tableName}` VALUES
(11, 'who am I', NULL, NULL),
(12, 'mage', 'alliance', '2018-12-03 16:11:28');
"""

def checkNewPartition = { inputRes -> Boolean
for (List<Object> row : inputRes) {
if ((row[1] as String).contains("PARTITION p2")) {
return true
}
}
return false
}
assertTrue(checkShowTimesOf("""
SHOW CREATE TABLE `TEST_${context.dbName}`.`${tableName}`
""",
checkNewPartition, 30, "target"))
}

0 comments on commit be74e8a

Please sign in to comment.