From dac089faefeb5d5e0ab7f64a0179262716cff154 Mon Sep 17 00:00:00 2001 From: walter Date: Fri, 24 May 2024 17:06:10 +0800 Subject: [PATCH] Fix binlog lost (#86) --- pkg/ccr/job.go | 4 +++- pkg/ccr/job_progress.go | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index 0281376e..eb8ff3b9 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -1186,7 +1186,8 @@ func (j *Job) recoverIncrementalSync() error { func (j *Job) incrementalSync() error { if !j.progress.IsDone() { - log.Infof("job progress is not done, state is (%s), need recover", j.progress.SubSyncState) + log.Infof("job progress is not done, need recover. state: %s, prevCommitSeq: %d, commitSeq: %d", + j.progress.SubSyncState, j.progress.PrevCommitSeq, j.progress.CommitSeq) return j.recoverIncrementalSync() } @@ -1202,6 +1203,7 @@ func (j *Job) incrementalSync() error { // Step 2: handle all binlog for { + // The CommitSeq is equals to PrevCommitSeq in here. commitSeq := j.progress.CommitSeq log.Debugf("src: %s, commitSeq: %v", src, commitSeq) diff --git a/pkg/ccr/job_progress.go b/pkg/ccr/job_progress.go index 773192d7..5c9351e8 100644 --- a/pkg/ccr/job_progress.go +++ b/pkg/ccr/job_progress.go @@ -210,7 +210,6 @@ func NewJobProgressFromJson(jobName string, db storage.DB) (*JobProgress, error) } func (j *JobProgress) StartHandle(commitSeq int64) { - j.PrevCommitSeq = j.CommitSeq j.CommitSeq = commitSeq j.Persist() @@ -284,7 +283,7 @@ func (j *JobProgress) NextWithPersist(commitSeq int64, syncState SyncState, subS j.Persist() } -func (j *JobProgress) IsDone() bool { return j.SubSyncState == Done } +func (j *JobProgress) IsDone() bool { return j.SubSyncState == Done && j.PrevCommitSeq == j.CommitSeq } // TODO(Drogon): check reset some fields func (j *JobProgress) Done() { @@ -338,5 +337,6 @@ func (j *JobProgress) Persist() { break } - log.Trace("update job progress done") + log.Tracef("update job progress done, state: %s, subState: %s, commitSeq: %d, prevCommitSeq: %d", + j.SyncState, j.SubSyncState, j.CommitSeq, j.PrevCommitSeq) }