From 4707749e9e276d689c7807fe1451b3d35d947d56 Mon Sep 17 00:00:00 2001 From: walter Date: Wed, 11 Sep 2024 14:51:26 +0800 Subject: [PATCH] Clear the staled table mapping if dest table is not found (#163) --- pkg/ccr/job.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index b5c1c975..127e6357 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -1034,13 +1034,13 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { } log.Debugf("resp: %v", beginTxnResp) if beginTxnResp.GetStatus().GetStatusCode() != tstatus.TStatusCode_OK { - if isTableNotFound(beginTxnResp.GetStatus()) { - // The IngestBinlog and CommitTxn will rollback and retry, so the "table not found" - // error will be triggered at the BeginTxn stage, now force a new snapshot progress. - return xerror.Errorf(xerror.Meta, "begin txn failed, table is not found, status: %v", beginTxnResp.GetStatus()) - } else { - return xerror.Errorf(xerror.Normal, "begin txn failed, status: %v", beginTxnResp.GetStatus()) + if isTableNotFound(beginTxnResp.GetStatus()) && j.SyncType == DBSync { + // It might caused by the staled TableMapping entries. + for _, tableRecord := range inMemoryData.TableRecords { + delete(j.progress.TableMapping, tableRecord.Id) + } } + return xerror.Errorf(xerror.Normal, "begin txn failed, status: %v", beginTxnResp.GetStatus()) } txnId := beginTxnResp.GetTxnId() log.Debugf("TxnId: %d, DbId: %d", txnId, beginTxnResp.GetDbId())