diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index 736fc228..b5c1c975 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -1034,7 +1034,13 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error { } log.Debugf("resp: %v", beginTxnResp) if beginTxnResp.GetStatus().GetStatusCode() != tstatus.TStatusCode_OK { - return xerror.Errorf(xerror.Normal, "begin txn failed, status: %v", beginTxnResp.GetStatus()) + if isTableNotFound(beginTxnResp.GetStatus()) { + // The IngestBinlog and CommitTxn will rollback and retry, so the "table not found" + // error will be triggered at the BeginTxn stage, now force a new snapshot progress. + return xerror.Errorf(xerror.Meta, "begin txn failed, table is not found, status: %v", beginTxnResp.GetStatus()) + } else { + return xerror.Errorf(xerror.Normal, "begin txn failed, status: %v", beginTxnResp.GetStatus()) + } } txnId := beginTxnResp.GetTxnId() log.Debugf("TxnId: %d, DbId: %d", txnId, beginTxnResp.GetDbId()) @@ -2142,13 +2148,7 @@ func (j *Job) Status() *JobStatus { } func isTxnCommitted(status *tstatus.TStatus) bool { - errMessages := status.GetErrorMsgs() - for _, errMessage := range errMessages { - if strings.Contains(errMessage, "is already COMMITTED") { - return true - } - } - return false + return isStatusContainsAny(status, "is already COMMITTED") } func isTxnNotFound(status *tstatus.TStatus) bool { @@ -2164,10 +2164,23 @@ func isTxnNotFound(status *tstatus.TStatus) bool { } func isTxnAborted(status *tstatus.TStatus) bool { + return isStatusContainsAny(status, "is already aborted") +} + +func isTableNotFound(status *tstatus.TStatus) bool { + // 1. FE FrontendServiceImpl.beginTxnImpl + // 2. FE FrontendServiceImpl.commitTxnImpl + // 3. FE Table.tryWriteLockOrMetaException + return isStatusContainsAny(status, "can't find table id:", "table not found", "unknown table") +} + +func isStatusContainsAny(status *tstatus.TStatus, patterns ...string) bool { errMessages := status.GetErrorMsgs() for _, errMessage := range errMessages { - if strings.Contains(errMessage, "is already aborted") { - return true + for _, substr := range patterns { + if strings.Contains(errMessage, substr) { + return true + } } } return false