Skip to content

Commit

Permalink
Add partial sync table name check (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Aug 29, 2024
1 parent 5cef61b commit a0feec0
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ func (j *Job) isIncrementalSync() bool {
}
}

func (j *Job) isTableSyncWithAlias() bool {
return j.SyncType == TableSync && j.Src.Table != j.Dest.Table
}

func (j *Job) addExtraInfo(jobInfo []byte) ([]byte, error) {
var jobInfoMap map[string]interface{}
err := json.Unmarshal(jobInfo, &jobInfoMap)
Expand Down Expand Up @@ -407,7 +411,7 @@ func (j *Job) partialSync() error {
log.Debugf("partial sync begin restore snapshot %s to %s", snapshotName, restoreSnapshotName)

var tableRefs []*festruct.TTableRef
if j.SyncType == TableSync && j.Src.Table != j.Dest.Table {
if j.isTableSyncWithAlias() {
log.Debugf("table sync snapshot not same name, table: %s, dest table: %s", j.Src.Table, j.Dest.Table)
tableRefs = make([]*festruct.TTableRef, 0)
tableRef := &festruct.TTableRef{
Expand Down Expand Up @@ -606,7 +610,7 @@ func (j *Job) fullSync() error {
log.Debugf("begin restore snapshot %s to %s", snapshotName, restoreSnapshotName)

var tableRefs []*festruct.TTableRef
if j.SyncType == TableSync && j.Src.Table != j.Dest.Table {
if j.isTableSyncWithAlias() {
log.Debugf("table sync snapshot not same name, table: %s, dest table: %s", j.Src.Table, j.Dest.Table)
tableRefs = make([]*festruct.TTableRef, 0)
tableRef := &festruct.TTableRef{
Expand Down Expand Up @@ -1675,11 +1679,18 @@ func (j *Job) newSnapshot(commitSeq int64) error {
}
}

// New partial snapshot, with the source cluster table name and the partitions to sync.
// A empty partitions means to sync the whole table.
func (j *Job) newPartialSnapshot(table string, partitions []string) error {
// The binlog of commitSeq will be skipped once the partial snapshot finished.
commitSeq := j.progress.CommitSeq
log.Infof("new partial snapshot, commitSeq: %d, table: %s, partitions: %v", commitSeq, table, partitions)

if j.SyncType == TableSync && table != j.Src.Table {
return xerror.Errorf(xerror.Normal,
"partial sync table name is not equals to the source name %s, table: %s, sync type: table", j.Src.Table, table)
}

j.progress.PartialSyncData = &JobPartialSyncData{
Table: table,
Partitions: partitions,
Expand Down

0 comments on commit a0feec0

Please sign in to comment.