Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into branch-2.1
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Sep 10, 2024
2 parents d009a75 + 6ab94ff commit b7960ea
Show file tree
Hide file tree
Showing 66 changed files with 12,919 additions and 1,750 deletions.
91 changes: 69 additions & 22 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func (s *Spec) GetAllViewsFromTable(tableName string) ([]string, error) {
}

// then query view's create sql, if create sql contains tableName, this view is wanted
viewRegex := regexp.MustCompile("`internal`.`(\\w+)`.`" + strings.TrimSpace(tableName) + "`")
viewRegex := regexp.MustCompile("(`internal`\\.`\\w+`|`default_cluster:\\w+`)\\.`" + strings.TrimSpace(tableName) + "`")
for _, eachViewName := range viewsFromQuery {
showCreateViewSql := fmt.Sprintf("SHOW CREATE VIEW %s", eachViewName)
createViewSqlList, err := s.queryResult(showCreateViewSql, "Create View", "SHOW CREATE VIEW")
Expand All @@ -358,6 +358,33 @@ func (s *Spec) GetAllViewsFromTable(tableName string) ([]string, error) {
return results, nil
}

func (s *Spec) RenameTable(destTableName string, renameTable *record.RenameTable) error {
// rename table may be 'rename table', 'rename rollup', 'rename partition'
var sql string
// ALTER TABLE table1 RENAME table2;
if renameTable.NewTableName != "" && renameTable.OldTableName != "" {
sql = fmt.Sprintf("ALTER TABLE %s RENAME %s", renameTable.OldTableName, renameTable.NewTableName)
}

// ALTER TABLE example_table RENAME ROLLUP rollup1 rollup2;
// if rename rollup, table name is unchanged
if renameTable.NewRollupName != "" && renameTable.OldRollupName != "" {
sql = fmt.Sprintf("ALTER TABLE %s RENAME ROLLUP %s %s", destTableName, renameTable.OldRollupName, renameTable.NewRollupName)
}

// ALTER TABLE example_table RENAME PARTITION p1 p2;
// if rename partition, table name is unchanged
if renameTable.NewParitionName != "" && renameTable.OldParitionName != "" {
sql = fmt.Sprintf("ALTER TABLE %s RENAME PARTITION %s %s;", destTableName, renameTable.OldParitionName, renameTable.NewParitionName)
}
if sql == "" {
return xerror.Errorf(xerror.Normal, "rename sql is empty")
}

log.Infof("renam table sql: %s", sql)
return s.DbExec(sql)
}

func (s *Spec) dropTable(table string, force bool) error {
log.Infof("drop table %s.%s", s.Database, table)

Expand Down Expand Up @@ -421,10 +448,12 @@ func (s *Spec) CreateTableOrView(createTable *record.CreateTable, srcDatabase st
if isCreateView {
log.Debugf("create view, use dest db name to replace source db name")

// replace `internal`.`source_db_name`. to `internal`.`dest_db_name`.
originalName := "`internal`.`" + strings.TrimSpace(srcDatabase) + "`."
// replace `internal`.`source_db_name`. or `default_cluster:source_db_name`. to `internal`.`dest_db_name`.
originalNameNewStyle := "`internal`.`" + strings.TrimSpace(srcDatabase) + "`."
originalNameOldStyle := "`default_cluster:" + strings.TrimSpace(srcDatabase) + "`." // for Doris 2.0.x
replaceName := "`internal`.`" + strings.TrimSpace(s.Database) + "`."
createTable.Sql = strings.ReplaceAll(createTable.Sql, originalName, replaceName)
createTable.Sql = strings.ReplaceAll(
strings.ReplaceAll(createTable.Sql, originalNameNewStyle, replaceName), originalNameOldStyle, replaceName)
log.Debugf("original create view sql is %s, after replace, now sql is %s", createSql, createTable.Sql)
}

Expand Down Expand Up @@ -471,12 +500,17 @@ func (s *Spec) CheckDatabaseExists() (bool, error) {
func (s *Spec) CheckTableExists() (bool, error) {
log.Debugf("check table exist by spec: %s", s.String())

return s.CheckTableExistsByName(s.Table)
}

// check table exists in database dir by the specified table name.
func (s *Spec) CheckTableExistsByName(tableName string) (bool, error) {
db, err := s.Connect()
if err != nil {
return false, err
}

sql := fmt.Sprintf("SHOW TABLES FROM %s LIKE '%s'", utils.FormatKeywordName(s.Database), s.Table)
sql := fmt.Sprintf("SHOW TABLES FROM %s LIKE '%s'", utils.FormatKeywordName(s.Database), tableName)
rows, err := db.Query(sql)
if err != nil {
return false, xerror.Wrapf(err, xerror.Normal, "show tables failed, sql: %s", sql)
Expand Down Expand Up @@ -561,15 +595,10 @@ func (s *Spec) CreatePartialSnapshotAndWaitForDone(table string, partitions []st
return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
}

if len(partitions) == 0 {
return "", xerror.Errorf(xerror.Normal, "partition is empty! you should have at least one partition")
}

// snapshot name format "ccrp_${table}_${timestamp}"
// table refs = table
snapshotName := fmt.Sprintf("ccrp_%s_%s_%d", s.Database, s.Table, time.Now().Unix())
tableRef := utils.FormatKeywordName(table)
partitionRefs := "`" + strings.Join(partitions, "`,`") + "`"

log.Infof("create partial snapshot %s.%s", s.Database, snapshotName)

Expand All @@ -578,7 +607,13 @@ func (s *Spec) CreatePartialSnapshotAndWaitForDone(table string, partitions []st
return "", err
}

backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s PARTITION (%s) ) PROPERTIES (\"type\" = \"full\")", utils.FormatKeywordName(s.Database), snapshotName, tableRef, partitionRefs)
partitionRefs := ""
if len(partitions) > 0 {
partitionRefs = " PARTITION (`" + strings.Join(partitions, "`,`") + "`)"
}
backupSnapshotSql := fmt.Sprintf(
"BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON (%s%s) PROPERTIES (\"type\" = \"full\")",
utils.FormatKeywordName(s.Database), snapshotName, tableRef, partitionRefs)
log.Debugf("backup partial snapshot sql: %s", backupSnapshotSql)
_, err = db.Exec(backupSnapshotSql)
if err != nil {
Expand Down Expand Up @@ -722,32 +757,35 @@ func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) {
return false, nil
}

func (s *Spec) GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error) {
func (s *Spec) GetRestoreSignatureNotMatchedTableOrView(snapshotName string) (string, bool, error) {
log.Debugf("get restore signature not matched table, spec: %s, snapshot: %s", s.String(), snapshotName)

for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ {
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil {
return "", err
return "", false, err
} else if restoreState == RestoreStateFinished {
return "", nil
return "", false, nil
} else if restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) {
pattern := regexp.MustCompile("Table (?P<tableName>.*) already exist but with different schema")
pattern := regexp.MustCompile("(?P<tableOrView>Table|View) (?P<tableName>.*) already exist but with different schema")
matches := pattern.FindStringSubmatch(status)
index := pattern.SubexpIndex("tableName")
if len(matches) < index && len(matches[index]) == 0 {
return "", xerror.Errorf(xerror.Normal, "match table name from restore status failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
if len(matches) == 0 || index == -1 || len(matches[index]) == 0 {
return "", false, xerror.Errorf(xerror.Normal, "match table name from restore status failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
}
return matches[index], nil

resource := matches[pattern.SubexpIndex("tableOrView")]
tableOrView := resource == "Table"
return matches[index], tableOrView, nil
} else if restoreState == RestoreStateCancelled {
return "", xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
return "", false, xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else {
// RestoreStatePending, RestoreStateUnknown
time.Sleep(RESTORE_CHECK_DURATION)
}
}

log.Warnf("get restore signature not matched timeout, max try times: %d, spec: %s, snapshot: %s", MAX_CHECK_RETRY_TIMES, s, snapshotName)
return "", nil
return "", false, nil
}

func (s *Spec) waitTransactionDone(txnId int64) error {
Expand Down Expand Up @@ -886,7 +924,7 @@ func (s *Spec) LightningSchemaChange(srcDatabase string, lightningSchemaChange *
} else {
sql = strings.Replace(rawSql, fmt.Sprintf("`%s`.", srcDatabase), "", 1)
}
log.Infof("lightningSchemaChangeSql, rawSql: %s, sql: %s", rawSql, sql)
log.Infof("lighting schema change sql, rawSql: %s, sql: %s", rawSql, sql)
return s.DbExec(sql)
}

Expand All @@ -898,7 +936,16 @@ func (s *Spec) TruncateTable(destTableName string, truncateTable *record.Truncat
sql = fmt.Sprintf("TRUNCATE TABLE %s %s", utils.FormatKeywordName(destTableName), truncateTable.RawSql)
}

log.Infof("truncateTableSql: %s", sql)
log.Infof("truncate table sql: %s", sql)

return s.DbExec(sql)
}

func (s *Spec) ReplaceTable(fromName, toName string, swap bool) error {
sql := fmt.Sprintf("ALTER TABLE %s REPLACE WITH TABLE %s PROPERTIES(\"swap\"=\"%t\")",
utils.FormatKeywordName(toName), utils.FormatKeywordName(fromName), swap)

log.Infof("replace table sql: %s", sql)

return s.DbExec(sql)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccr/base/specer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ type Specer interface {
CreateTableOrView(createTable *record.CreateTable, srcDatabase string) error
CheckDatabaseExists() (bool, error)
CheckTableExists() (bool, error)
CheckTableExistsByName(tableName string) (bool, error)
CreatePartialSnapshotAndWaitForDone(table string, partitions []string) (string, error)
CreateSnapshotAndWaitForDone(tables []string) (string, error)
CheckRestoreFinished(snapshotName string) (bool, error)
GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error)
GetRestoreSignatureNotMatchedTableOrView(snapshotName string) (string, bool, error)
WaitTransactionDone(txnId int64) // busy wait

LightningSchemaChange(srcDatabase string, changes *record.ModifyTableAddOrDropColumns) error
RenameTable(destTableName string, renameTable *record.RenameTable) error
TruncateTable(destTableName string, truncateTable *record.TruncateTable) error
ReplaceTable(fromName, toName string, swap bool) error
DropTable(tableName string, force bool) error
DropView(viewName string) error

Expand Down
Loading

0 comments on commit b7960ea

Please sign in to comment.