Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix keyword name in ADD PARTITION #65

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 12 additions & 16 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ func (s BackupState) String() string {
}
}

func formatKeyWordName(name string) string {
return "`" + strings.TrimSpace(name) + "`"
}

func ParseBackupState(state string) BackupState {
switch state {
case "PENDING":
Expand Down Expand Up @@ -204,7 +200,7 @@ func (s *Spec) IsDatabaseEnableBinlog() (bool, error) {
}

var createDBString string
query := fmt.Sprintf("SHOW CREATE DATABASE %s", formatKeyWordName(s.Database))
query := fmt.Sprintf("SHOW CREATE DATABASE %s", utils.FormatKeywordName(s.Database))
rows, err := db.Query(query)
if err != nil {
return false, xerror.Wrap(err, xerror.Normal, query)
Expand Down Expand Up @@ -242,7 +238,7 @@ func (s *Spec) IsTableEnableBinlog() (bool, error) {
}

var createTableString string
query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", formatKeyWordName(s.Database), formatKeyWordName(s.Table))
query := fmt.Sprintf("SHOW CREATE TABLE %s.%s", utils.FormatKeywordName(s.Database), utils.FormatKeywordName(s.Table))
rows, err := db.Query(query)
if err != nil {
return false, xerror.Wrap(err, xerror.Normal, query)
Expand Down Expand Up @@ -308,7 +304,7 @@ func (s *Spec) dropTable(table string) error {
return err
}

sql := fmt.Sprintf("DROP TABLE %s.%s", formatKeyWordName(s.Database), formatKeyWordName(table))
sql := fmt.Sprintf("DROP TABLE %s.%s", utils.FormatKeywordName(s.Database), utils.FormatKeywordName(table))
_, err = db.Exec(sql)
if err != nil {
return xerror.Wrapf(err, xerror.Normal, "drop table %s.%s failed, sql: %s", s.Database, table, sql)
Expand All @@ -324,13 +320,13 @@ func (s *Spec) ClearDB() error {
return err
}

sql := fmt.Sprintf("DROP DATABASE %s", formatKeyWordName(s.Database))
sql := fmt.Sprintf("DROP DATABASE %s", utils.FormatKeywordName(s.Database))
_, err = db.Exec(sql)
if err != nil {
return xerror.Wrapf(err, xerror.Normal, "drop database %s failed", s.Database)
}

if _, err = db.Exec("CREATE DATABASE " + formatKeyWordName(s.Database)); err != nil {
if _, err = db.Exec("CREATE DATABASE " + utils.FormatKeywordName(s.Database)); err != nil {
return xerror.Wrapf(err, xerror.Normal, "create database %s failed", s.Database)
}
return nil
Expand All @@ -344,7 +340,7 @@ func (s *Spec) CreateDatabase() error {
return nil
}

if _, err = db.Exec("CREATE DATABASE IF NOT EXISTS " + formatKeyWordName(s.Database)); err != nil {
if _, err = db.Exec("CREATE DATABASE IF NOT EXISTS " + utils.FormatKeywordName(s.Database)); err != nil {
return xerror.Wrapf(err, xerror.Normal, "create database %s failed", s.Database)
}
return nil
Expand Down Expand Up @@ -404,7 +400,7 @@ func (s *Spec) CheckTableExists() (bool, error) {
return false, err
}

sql := fmt.Sprintf("SHOW TABLES FROM %s LIKE '%s'", formatKeyWordName(s.Database), s.Table)
sql := fmt.Sprintf("SHOW TABLES FROM %s LIKE '%s'", utils.FormatKeywordName(s.Database), s.Table)
rows, err := db.Query(sql)
if err != nil {
return false, xerror.Wrapf(err, xerror.Normal, "show tables failed, sql: %s", sql)
Expand Down Expand Up @@ -444,7 +440,7 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) {
// snapshot name format "ccrs_${table}_${timestamp}"
// table refs = table
snapshotName = fmt.Sprintf("ccrs_%s_%s_%d", s.Database, s.Table, time.Now().Unix())
tableRefs = formatKeyWordName(tables[0])
tableRefs = utils.FormatKeywordName(tables[0])
} else {
// snapshot name format "ccrs_${db}_${timestamp}"
// table refs = tables.join(", ")
Expand All @@ -464,7 +460,7 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) {
return "", err
}

backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s ) PROPERTIES (\"type\" = \"full\")", formatKeyWordName(s.Database), snapshotName, tableRefs)
backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s ) PROPERTIES (\"type\" = \"full\")", utils.FormatKeywordName(s.Database), snapshotName, tableRefs)
log.Debugf("backup snapshot sql: %s", backupSnapshotSql)
_, err = db.Exec(backupSnapshotSql)
if err != nil {
Expand Down Expand Up @@ -492,7 +488,7 @@ func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, error) {
return BackupStateUnknown, err
}

sql := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName = \"%s\"", formatKeyWordName(s.Database), snapshotName)
sql := fmt.Sprintf("SHOW BACKUP FROM %s WHERE SnapshotName = \"%s\"", utils.FormatKeywordName(s.Database), snapshotName)
log.Debugf("check backup state sql: %s", sql)
rows, err := db.Query(sql)
if err != nil {
Expand Down Expand Up @@ -545,7 +541,7 @@ func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string,
return RestoreStateUnknown, "", err
}

query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label = \"%s\"", formatKeyWordName(s.Database), snapshotName)
query := fmt.Sprintf("SHOW RESTORE FROM %s WHERE Label = \"%s\"", utils.FormatKeywordName(s.Database), snapshotName)

log.Debugf("check restore state sql: %s", query)
rows, err := db.Query(query)
Expand Down Expand Up @@ -639,7 +635,7 @@ func (s *Spec) waitTransactionDone(txnId int64) error {
// WHERE
// [id=transaction_id]
// [label = label_name];
query := fmt.Sprintf("SHOW TRANSACTION FROM %s WHERE id = %d", formatKeyWordName(s.Database), txnId)
query := fmt.Sprintf("SHOW TRANSACTION FROM %s WHERE id = %d", utils.FormatKeywordName(s.Database), txnId)

log.Debugf("wait transaction done sql: %s", query)
rows, err := db.Query(query)
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccr/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (m *Meta) UpdateTable(tableName string, tableId int64) (*TableMeta, error)
}

query := fmt.Sprintf("show proc '/dbs/%d/'", dbId)
log.Infof("UpdateTable Sql: %s", query)
rows, err := db.Query(query)
if err != nil {
return nil, xerror.Wrap(err, xerror.Normal, query)
Expand Down Expand Up @@ -185,7 +186,7 @@ func (m *Meta) UpdateTable(tableName string, tableId int64) (*TableMeta, error)
}

// not found
return nil, xerror.Errorf(xerror.Meta, "tableId %v not found table", tableId)
return nil, xerror.Errorf(xerror.Meta, "tableName %s tableId %v not found table", tableName, tableId)
}

func (m *Meta) GetTable(tableId int64) (*TableMeta, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccr/record/add_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"

"github.com/selectdb/ccr_syncer/pkg/utils"
"github.com/selectdb/ccr_syncer/pkg/xerror"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -53,7 +54,7 @@ func (addPartition *AddPartition) getDistributionColumns() []string {

func (addPartition *AddPartition) GetSql(destTableName string) string {
// addPartitionSql = "ALTER TABLE " + sql
addPartitionSql := fmt.Sprintf("ALTER TABLE %s %s", destTableName, addPartition.Sql)
addPartitionSql := fmt.Sprintf("ALTER TABLE %s %s", utils.FormatKeywordName(destTableName), addPartition.Sql)
// remove last ';' and add BUCKETS num
addPartitionSql = strings.TrimRight(addPartitionSql, ";")
// check contains BUCKETS num, ignore case
Expand Down
5 changes: 5 additions & 0 deletions pkg/utils/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package utils
import (
"database/sql"
"strconv"
"strings"

"github.com/selectdb/ccr_syncer/pkg/xerror"
)
Expand Down Expand Up @@ -83,3 +84,7 @@ func (r *RowParser) GetString(columnName string) (string, error) {

return string(*resBytes), nil
}

func FormatKeywordName(name string) string {
return "`" + strings.TrimSpace(name) + "`"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

suite("test_keyword_nema") {
suite("test_keyword_name") {

def tableName = "roles"
def syncerAddress = "127.0.0.1:9190"
Expand Down Expand Up @@ -90,6 +90,8 @@ suite("test_keyword_nema") {
return res.size() == 0
}

sql "DROP TABLE IF EXISTS `${tableName}` FORCE"
target_sql "DROP TABLE IF EXISTS `${tableName}` FORCE"
sql """
CREATE TABLE `${tableName}` (
role_id INT,
Expand All @@ -98,6 +100,10 @@ suite("test_keyword_nema") {
register_time DATE
)
UNIQUE KEY(role_id)
PARTITION BY RANGE (role_id)
(
PARTITION p1 VALUES LESS THAN ("10")
)
DISTRIBUTED BY HASH(role_id) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
Expand Down Expand Up @@ -130,13 +136,35 @@ suite("test_keyword_nema") {

assertTrue(checkRestoreFinishTimesOf("${tableName}", 30))



logger.info("=== Test 1: Check keyword name table ===")
// def checkShowTimesOf = { sqlString, myClosure, times, func = "sql" -> Boolean
assertTrue(checkShowTimesOf("""
SHOW CREATE TABLE `TEST_${context.dbName}`.`${tableName}`
""",
exist, 30, "target"))

logger.info("=== Test 2: Add new partition ===")
sql """
ALTER TABLE `${tableName}` ADD PARTITION p2
VALUES LESS THAN ("20")
"""

sql """
INSERT INTO `${tableName}` VALUES
(11, 'who am I', NULL, NULL),
(12, 'mage', 'alliance', '2018-12-03 16:11:28');
"""

def checkNewPartition = { inputRes -> Boolean
for (List<Object> row : inputRes) {
if ((row[1] as String).contains("PARTITION p2")) {
return true
}
}
return false
}
assertTrue(checkShowTimesOf("""
SHOW CREATE TABLE `TEST_${context.dbName}`.`${tableName}`
""",
checkNewPartition, 30, "target"))
}
Loading