Skip to content

Commit

Permalink
Clear the staled table mapping if dest table is not found (#163)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Sep 11, 2024
1 parent 480e932 commit 4707749
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,13 +1034,13 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error {
}
log.Debugf("resp: %v", beginTxnResp)
if beginTxnResp.GetStatus().GetStatusCode() != tstatus.TStatusCode_OK {
if isTableNotFound(beginTxnResp.GetStatus()) {
// The IngestBinlog and CommitTxn will rollback and retry, so the "table not found"
// error will be triggered at the BeginTxn stage, now force a new snapshot progress.
return xerror.Errorf(xerror.Meta, "begin txn failed, table is not found, status: %v", beginTxnResp.GetStatus())
} else {
return xerror.Errorf(xerror.Normal, "begin txn failed, status: %v", beginTxnResp.GetStatus())
if isTableNotFound(beginTxnResp.GetStatus()) && j.SyncType == DBSync {
// It might caused by the staled TableMapping entries.
for _, tableRecord := range inMemoryData.TableRecords {
delete(j.progress.TableMapping, tableRecord.Id)
}
}
return xerror.Errorf(xerror.Normal, "begin txn failed, status: %v", beginTxnResp.GetStatus())
}
txnId := beginTxnResp.GetTxnId()
log.Debugf("TxnId: %d, DbId: %d", txnId, beginTxnResp.GetDbId())
Expand Down

0 comments on commit 4707749

Please sign in to comment.