From 0de343d860b6ab0c7ea5bd52e5f484f39fad11d6 Mon Sep 17 00:00:00 2001 From: walter Date: Fri, 26 Apr 2024 14:30:09 +0800 Subject: [PATCH] Save key time point for ccr job progress (#64) --- pkg/ccr/job_progress.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/pkg/ccr/job_progress.go b/pkg/ccr/job_progress.go index 37d4ce99..d58468c3 100644 --- a/pkg/ccr/job_progress.go +++ b/pkg/ccr/job_progress.go @@ -142,6 +142,12 @@ type JobProgress struct { TableCommitSeqMap map[int64]int64 `json:"table_commit_seq_map"` // only for DBTablesIncrementalSync InMemoryData any `json:"-"` PersistData string `json:"data"` // this often for binlog or snapshot info + + // Some fields to save the unix epoch time of the key timepoint. + CreatedAt int64 `json:"created_at,omitempty"` + FullSyncStartAt int64 `json:"full_sync_start_at,omitempty"` + IncrementalSyncStartAt int64 `json:"incremental_sync_start_at,omitempty"` + IngestBinlogAt int64 `json:"ingest_binlog_at,omitempty"` } func (j *JobProgress) String() string { @@ -168,6 +174,11 @@ func NewJobProgress(jobName string, syncType SyncType, db storage.DB) *JobProgre TableCommitSeqMap: nil, InMemoryData: nil, PersistData: "", + + CreatedAt: time.Now().Unix(), + FullSyncStartAt: 0, + IncrementalSyncStartAt: 0, + IngestBinlogAt: 0, } } @@ -232,6 +243,10 @@ func _convertToPersistData(persistData any) string { // Persist is checkpint, next state only get it from persistData func (j *JobProgress) NextSubCheckpoint(subSyncState SubSyncState, persistData any) { + if subSyncState == IngestBinlog { + j.IngestBinlogAt = time.Now().Unix() + } + j.SubSyncState = subSyncState j.PersistData = _convertToPersistData(persistData) @@ -251,6 +266,15 @@ func (j *JobProgress) CommitNextSubWithPersist(commitSeq int64, subSyncState Sub } func (j *JobProgress) NextWithPersist(commitSeq int64, syncState SyncState, subSyncState SubSyncState, persistData string) { + if subSyncState == BeginCreateSnapshot && (syncState == TableFullSync || syncState == DBFullSync) { + j.FullSyncStartAt = time.Now().Unix() + j.IncrementalSyncStartAt = 0 + j.IngestBinlogAt = 0 + } else if subSyncState == Done && (syncState == TableIncrementalSync || syncState == TableIncrementalSync) { + j.IncrementalSyncStartAt = time.Now().Unix() + j.IngestBinlogAt = 0 + } + j.CommitSeq = commitSeq j.SyncState = syncState j.SubSyncState = subSyncState