Skip to content

Commit

Permalink
Move table to recycle bin if the signature is not matched (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Aug 28, 2024
1 parent 2097c26 commit 64d4dbe
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 8 deletions.
16 changes: 12 additions & 4 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,15 +358,19 @@ func (s *Spec) GetAllViewsFromTable(tableName string) ([]string, error) {
return results, nil
}

func (s *Spec) dropTable(table string) error {
func (s *Spec) dropTable(table string, force bool) error {
log.Infof("drop table %s.%s", s.Database, table)

db, err := s.Connect()
if err != nil {
return err
}

sql := fmt.Sprintf("DROP TABLE %s.%s", utils.FormatKeywordName(s.Database), utils.FormatKeywordName(table))
suffix := ""
if force {
suffix = "FORCE"
}
sql := fmt.Sprintf("DROP TABLE %s.%s %s", utils.FormatKeywordName(s.Database), utils.FormatKeywordName(table), suffix)
_, err = db.Exec(sql)
if err != nil {
return xerror.Wrapf(err, xerror.Normal, "drop table %s.%s failed, sql: %s", s.Database, table, sql)
Expand Down Expand Up @@ -899,8 +903,12 @@ func (s *Spec) TruncateTable(destTableName string, truncateTable *record.Truncat
return s.DbExec(sql)
}

func (s *Spec) DropTable(tableName string) error {
dropSql := fmt.Sprintf("DROP TABLE %s FORCE", utils.FormatKeywordName(tableName))
func (s *Spec) DropTable(tableName string, force bool) error {
sqlSuffix := ""
if force {
sqlSuffix = "FORCE"
}
dropSql := fmt.Sprintf("DROP TABLE %s %s", utils.FormatKeywordName(tableName), sqlSuffix)
log.Infof("drop table sql: %s", dropSql)
return s.DbExec(dropSql)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccr/base/specer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Specer interface {

LightningSchemaChange(srcDatabase string, changes *record.ModifyTableAddOrDropColumns) error
TruncateTable(destTableName string, truncateTable *record.TruncateTable) error
DropTable(tableName string) error
DropTable(tableName string, force bool) error
DropView(viewName string) error

AddPartition(destTableName string, addPartition *record.AddPartition) error
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func (j *Job) fullSync() error {
}
log.Infof("the signature of table %s is not matched with the target table in snapshot", tableName)
for {
if err := j.IDest.DropTable(tableName); err == nil {
if err := j.IDest.DropTable(tableName, false); err == nil {
break
}
}
Expand Down Expand Up @@ -1203,7 +1203,7 @@ func (j *Job) handleDropTable(binlog *festruct.TBinlog) error {
tableName = srcTable.Name
}

if err = j.IDest.DropTable(tableName); err != nil {
if err = j.IDest.DropTable(tableName, true); err != nil {
return xerror.Wrapf(err, xerror.Normal, "drop table %s", tableName)
}

Expand Down Expand Up @@ -1273,7 +1273,7 @@ func (j *Job) handleAlterJob(binlog *festruct.TBinlog) error {
allViewDeleted = true
}

if err := j.IDest.DropTable(destTableName); err == nil {
if err := j.IDest.DropTable(destTableName, true); err == nil {
break
}
}
Expand Down

0 comments on commit 64d4dbe

Please sign in to comment.