Skip to content

Commit

Permalink
Handle table not found error during begin txn (#162)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Sep 11, 2024
1 parent 2076890 commit d9dc73d
Showing 1 changed file with 23 additions and 10 deletions.
33 changes: 23 additions & 10 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,13 @@ func (j *Job) handleUpsert(binlog *festruct.TBinlog) error {
}
log.Debugf("resp: %v", beginTxnResp)
if beginTxnResp.GetStatus().GetStatusCode() != tstatus.TStatusCode_OK {
return xerror.Errorf(xerror.Normal, "begin txn failed, status: %v", beginTxnResp.GetStatus())
if isTableNotFound(beginTxnResp.GetStatus()) {
// The IngestBinlog and CommitTxn will rollback and retry, so the "table not found"
// error will be triggered at the BeginTxn stage, now force a new snapshot progress.
return xerror.Errorf(xerror.Meta, "begin txn failed, table is not found, status: %v", beginTxnResp.GetStatus())
} else {
return xerror.Errorf(xerror.Normal, "begin txn failed, status: %v", beginTxnResp.GetStatus())
}
}
txnId := beginTxnResp.GetTxnId()
log.Debugf("TxnId: %d, DbId: %d", txnId, beginTxnResp.GetDbId())
Expand Down Expand Up @@ -2142,13 +2148,7 @@ func (j *Job) Status() *JobStatus {
}

func isTxnCommitted(status *tstatus.TStatus) bool {
errMessages := status.GetErrorMsgs()
for _, errMessage := range errMessages {
if strings.Contains(errMessage, "is already COMMITTED") {
return true
}
}
return false
return isStatusContainsAny(status, "is already COMMITTED")
}

func isTxnNotFound(status *tstatus.TStatus) bool {
Expand All @@ -2164,10 +2164,23 @@ func isTxnNotFound(status *tstatus.TStatus) bool {
}

func isTxnAborted(status *tstatus.TStatus) bool {
return isStatusContainsAny(status, "is already aborted")
}

func isTableNotFound(status *tstatus.TStatus) bool {
// 1. FE FrontendServiceImpl.beginTxnImpl
// 2. FE FrontendServiceImpl.commitTxnImpl
// 3. FE Table.tryWriteLockOrMetaException
return isStatusContainsAny(status, "can't find table id:", "table not found", "unknown table")
}

func isStatusContainsAny(status *tstatus.TStatus, patterns ...string) bool {
errMessages := status.GetErrorMsgs()
for _, errMessage := range errMessages {
if strings.Contains(errMessage, "is already aborted") {
return true
for _, substr := range patterns {
if strings.Contains(errMessage, substr) {
return true
}
}
}
return false
Expand Down

0 comments on commit d9dc73d

Please sign in to comment.