From 64d4dbe57884b70bbc1a97ab6219e016deed907b Mon Sep 17 00:00:00 2001 From: walter Date: Thu, 22 Aug 2024 15:38:58 +0800 Subject: [PATCH] Move table to recycle bin if the signature is not matched (#137) --- pkg/ccr/base/spec.go | 16 ++++++++++++---- pkg/ccr/base/specer.go | 2 +- pkg/ccr/job.go | 6 +++--- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index 37cdc2a4..8200462f 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -358,7 +358,7 @@ func (s *Spec) GetAllViewsFromTable(tableName string) ([]string, error) { return results, nil } -func (s *Spec) dropTable(table string) error { +func (s *Spec) dropTable(table string, force bool) error { log.Infof("drop table %s.%s", s.Database, table) db, err := s.Connect() @@ -366,7 +366,11 @@ func (s *Spec) dropTable(table string) error { return err } - sql := fmt.Sprintf("DROP TABLE %s.%s", utils.FormatKeywordName(s.Database), utils.FormatKeywordName(table)) + suffix := "" + if force { + suffix = "FORCE" + } + sql := fmt.Sprintf("DROP TABLE %s.%s %s", utils.FormatKeywordName(s.Database), utils.FormatKeywordName(table), suffix) _, err = db.Exec(sql) if err != nil { return xerror.Wrapf(err, xerror.Normal, "drop table %s.%s failed, sql: %s", s.Database, table, sql) @@ -899,8 +903,12 @@ func (s *Spec) TruncateTable(destTableName string, truncateTable *record.Truncat return s.DbExec(sql) } -func (s *Spec) DropTable(tableName string) error { - dropSql := fmt.Sprintf("DROP TABLE %s FORCE", utils.FormatKeywordName(tableName)) +func (s *Spec) DropTable(tableName string, force bool) error { + sqlSuffix := "" + if force { + sqlSuffix = "FORCE" + } + dropSql := fmt.Sprintf("DROP TABLE %s %s", utils.FormatKeywordName(tableName), sqlSuffix) log.Infof("drop table sql: %s", dropSql) return s.DbExec(dropSql) } diff --git a/pkg/ccr/base/specer.go b/pkg/ccr/base/specer.go index 418afb78..5d8a73e9 100644 --- a/pkg/ccr/base/specer.go +++ b/pkg/ccr/base/specer.go @@ -32,7 +32,7 @@ type Specer interface { LightningSchemaChange(srcDatabase string, changes *record.ModifyTableAddOrDropColumns) error TruncateTable(destTableName string, truncateTable *record.TruncateTable) error - DropTable(tableName string) error + DropTable(tableName string, force bool) error DropView(viewName string) error AddPartition(destTableName string, addPartition *record.AddPartition) error diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index 321e75c2..25904976 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -645,7 +645,7 @@ func (j *Job) fullSync() error { } log.Infof("the signature of table %s is not matched with the target table in snapshot", tableName) for { - if err := j.IDest.DropTable(tableName); err == nil { + if err := j.IDest.DropTable(tableName, false); err == nil { break } } @@ -1203,7 +1203,7 @@ func (j *Job) handleDropTable(binlog *festruct.TBinlog) error { tableName = srcTable.Name } - if err = j.IDest.DropTable(tableName); err != nil { + if err = j.IDest.DropTable(tableName, true); err != nil { return xerror.Wrapf(err, xerror.Normal, "drop table %s", tableName) } @@ -1273,7 +1273,7 @@ func (j *Job) handleAlterJob(binlog *festruct.TBinlog) error { allViewDeleted = true } - if err := j.IDest.DropTable(destTableName); err == nil { + if err := j.IDest.DropTable(destTableName, true); err == nil { break } }