diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index 181de852..977dbc26 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -71,16 +71,17 @@ func (j JobState) String() string { } type Job struct { - SyncType SyncType `json:"sync_type"` - Name string `json:"name"` - Src base.Spec `json:"src"` - ISrc base.Specer `json:"-"` - srcMeta Metaer `json:"-"` - Dest base.Spec `json:"dest"` - IDest base.Specer `json:"-"` - destMeta Metaer `json:"-"` - SkipError bool `json:"skip_error"` - State JobState `json:"state"` + SyncType SyncType `json:"sync_type"` + Name string `json:"name"` + Src base.Spec `json:"src"` + ISrc base.Specer `json:"-"` + srcMeta Metaer `json:"-"` + Dest base.Spec `json:"dest"` + IDest base.Specer `json:"-"` + destMeta Metaer `json:"-"` + SkipError bool `json:"skip_error"` + ExceptTable string `json:"except_table"` + State JobState `json:"state"` factory *Factory `json:"-"` @@ -96,24 +97,37 @@ type Job struct { type jobContext struct { context.Context - src base.Spec - dest base.Spec - db storage.DB - skipError bool - factory *Factory + src base.Spec + dest base.Spec + db storage.DB + skipError bool + exceptTable string + factory *Factory } -func NewJobContext(src, dest base.Spec, skipError bool, db storage.DB, factory *Factory) *jobContext { +func NewJobContext(src, dest base.Spec, skipError bool, exceptTable string, db storage.DB, factory *Factory) *jobContext { return &jobContext{ - Context: context.Background(), - src: src, - dest: dest, - skipError: skipError, - db: db, - factory: factory, + Context: context.Background(), + src: src, + dest: dest, + skipError: skipError, + exceptTable: exceptTable, + db: db, + factory: factory, } } +func IsExceptTable(exceptTables string, targetTable string) bool { + exceptTableList := strings.Split(exceptTables, ",") + for _, exceptTable := range exceptTableList { + if targetTable == strings.TrimSpace(exceptTable) { + return true + } + } + + return false +} + // new job func NewJobFromService(name string, ctx context.Context) (*Job, error) { jobContext, ok := ctx.(*jobContext) @@ -125,15 +139,16 @@ func NewJobFromService(name string, ctx context.Context) (*Job, error) { src := jobContext.src dest := jobContext.dest job := &Job{ - Name: name, - Src: src, - ISrc: factory.NewSpecer(&src), - srcMeta: factory.NewMeta(&jobContext.src), - Dest: dest, - IDest: factory.NewSpecer(&dest), - destMeta: factory.NewMeta(&jobContext.dest), - SkipError: jobContext.skipError, - State: JobRunning, + Name: name, + Src: src, + ISrc: factory.NewSpecer(&src), + srcMeta: factory.NewMeta(&jobContext.src), + Dest: dest, + IDest: factory.NewSpecer(&dest), + destMeta: factory.NewMeta(&jobContext.dest), + SkipError: jobContext.skipError, + State: JobRunning, + ExceptTable: jobContext.exceptTable, factory: factory, @@ -203,6 +218,9 @@ func (j *Job) valid() error { return xerror.New(xerror.Normal, "src/dest are not both db or table sync") } + if j.Src.Table != "" && j.ExceptTable != "" { + return xerror.New(xerror.Normal, "source table and except table caonnot exist at the same time") + } return nil } @@ -292,9 +310,17 @@ func (j *Job) fullSync() error { if err != nil { return err } + + // skip if table is except + log.Infof("fullsync except table: %s", j.ExceptTable) for _, table := range tables { + if IsExceptTable(j.ExceptTable, table.Name) { + continue + } + backupTableList = append(backupTableList, table.Name) } + log.Infof("fullsync table is : %s", backupTableList) case TableSync: backupTableList = append(backupTableList, j.Src.Table) default: @@ -730,7 +756,14 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { log.Debugf("tableRecords: %v", tableRecords) destTableIds := make([]int64, 0, len(tableRecords)) if j.SyncType == DBSync { + var sourceTableName string for _, tableRecord := range tableRecords { + sourceTableName, err = j.srcMeta.GetTableNameById(tableRecord.Id) + if IsExceptTable(j.ExceptTable, sourceTableName) { + log.Infof("db sync upsert, but table is except, just return") + return nil + } + if destTableId, err := j.getDestTableIdBySrc(tableRecord.Id); err != nil { return err } else { @@ -886,6 +919,14 @@ func (j *Job) handleAddPartition(binlog *festruct.TBinlog) error { if j.SyncType == TableSync { destTableName = j.Dest.Table } else if j.SyncType == DBSync { + // use sourceTableName to judge if table is except + var sourceTableName string + sourceTableName, err = j.srcMeta.GetTableNameById(addPartition.TableId) + if IsExceptTable(j.ExceptTable, sourceTableName) { + log.Infof("db sync add partition, but table is except, just return") + return nil + } + destTableId, err := j.getDestTableIdBySrc(addPartition.TableId) if err != nil { return err @@ -918,6 +959,14 @@ func (j *Job) handleDropPartition(binlog *festruct.TBinlog) error { if j.SyncType == TableSync { destTableName = j.Dest.Table } else if j.SyncType == DBSync { + // use sourceTableName to judge if table is except + var sourceTableName string + sourceTableName, err = j.srcMeta.GetTableNameById(dropPartition.TableId) + if IsExceptTable(j.ExceptTable, sourceTableName) { + log.Infof("db sync drop partition, but table is except, just return") + return nil + } + destTableId, err := j.getDestTableIdBySrc(dropPartition.TableId) if err != nil { return err @@ -1004,6 +1053,11 @@ func (j *Job) handleDropTable(binlog *festruct.TBinlog) error { tableName = srcTable.Name } + if IsExceptTable(j.ExceptTable, tableName) { + log.Infof("db sync drop table, but table is except, just return") + return nil + } + sql := fmt.Sprintf("DROP TABLE %s FORCE", utils.FormatKeywordName(tableName)) log.Infof("dropTableSql: %s", sql) if err = j.IDest.DbExec(sql); err != nil { @@ -1049,6 +1103,12 @@ func (j *Job) handleAlterJob(binlog *festruct.TBinlog) error { if j.SyncType == TableSync { dropTableSql = fmt.Sprintf("DROP TABLE %s FORCE", utils.FormatKeywordName(j.Dest.Table)) } else { + // if table is except, just break + if IsExceptTable(j.ExceptTable, alterJob.TableName) { + log.Infof("db sync alter job, but table is except, just break") + break + } + dropTableSql = fmt.Sprintf("DROP TABLE %s FORCE", utils.FormatKeywordName(alterJob.TableName)) } log.Infof("dropTableSql: %s", dropTableSql) @@ -1074,6 +1134,13 @@ func (j *Job) handleLightningSchemaChange(binlog *festruct.TBinlog) error { log.Debugf("lightningSchemaChange %v", lightningSchemaChange) rawSql := lightningSchemaChange.RawSql + var sourceTableName string + sourceTableName, err = j.srcMeta.GetTableNameById(lightningSchemaChange.TableId) + if IsExceptTable(j.ExceptTable, sourceTableName) { + log.Infof("db sync lightning schema change, but table is except, just return") + return nil + } + // "rawSql": "ALTER TABLE `default_cluster:ccr`.`test_ddl` ADD COLUMN `nid1` int(11) NULL COMMENT \"\"" // replace `default_cluster:${Src.Database}`.`test_ddl` to `test_ddl` var sql string @@ -1098,6 +1165,11 @@ func (j *Job) handleTruncateTable(binlog *festruct.TBinlog) error { var destTableName string switch j.SyncType { case DBSync: + if IsExceptTable(j.ExceptTable, truncateTable.TableName) { + log.Infof("db sync truncate table, but table is except, just return") + return nil + } + destTableName = truncateTable.TableName case TableSync: destTableName = j.Dest.Table diff --git a/pkg/service/http_service.go b/pkg/service/http_service.go index 1e85acb9..28a44751 100644 --- a/pkg/service/http_service.go +++ b/pkg/service/http_service.go @@ -73,15 +73,16 @@ func NewHttpServer(host string, port int, db storage.DB, jobManager *ccr.JobMana type CreateCcrRequest struct { // must need all fields required - Name string `json:"name,required"` - Src base.Spec `json:"src,required"` - Dest base.Spec `json:"dest,required"` - SkipError bool `json:"skip_error"` + Name string `json:"name,required"` + Src base.Spec `json:"src,required"` + Dest base.Spec `json:"dest,required"` + SkipError bool `json:"skip_error"` + ExceptTable string `json:"except_table"` } // Stringer func (r *CreateCcrRequest) String() string { - return fmt.Sprintf("name: %s, src: %v, dest: %v", r.Name, r.Src, r.Dest) + return fmt.Sprintf("name: %s, src: %v, dest: %v, except_table: %s", r.Name, r.Src, r.Dest, r.ExceptTable) } // version Handler @@ -106,7 +107,7 @@ func (s *HttpService) versionHandler(w http.ResponseWriter, r *http.Request) { func createCcr(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobManager) error { log.Infof("create ccr %s", request) - ctx := ccr.NewJobContext(request.Src, request.Dest, request.SkipError, db, jobManager.GetFactory()) + ctx := ccr.NewJobContext(request.Src, request.Dest, request.SkipError, request.ExceptTable, db, jobManager.GetFactory()) job, err := ccr.NewJobFromService(request.Name, ctx) if err != nil { return err