Skip to content

Commit

Permalink
Save key time point for ccr job progress (#64)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored Apr 26, 2024
1 parent 7599f37 commit 0de343d
Showing 1 changed file with 24 additions and 0 deletions.
24 changes: 24 additions & 0 deletions pkg/ccr/job_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ type JobProgress struct {
TableCommitSeqMap map[int64]int64 `json:"table_commit_seq_map"` // only for DBTablesIncrementalSync
InMemoryData any `json:"-"`
PersistData string `json:"data"` // this often for binlog or snapshot info

// Some fields to save the unix epoch time of the key timepoint.
CreatedAt int64 `json:"created_at,omitempty"`
FullSyncStartAt int64 `json:"full_sync_start_at,omitempty"`
IncrementalSyncStartAt int64 `json:"incremental_sync_start_at,omitempty"`
IngestBinlogAt int64 `json:"ingest_binlog_at,omitempty"`
}

func (j *JobProgress) String() string {
Expand All @@ -168,6 +174,11 @@ func NewJobProgress(jobName string, syncType SyncType, db storage.DB) *JobProgre
TableCommitSeqMap: nil,
InMemoryData: nil,
PersistData: "",

CreatedAt: time.Now().Unix(),
FullSyncStartAt: 0,
IncrementalSyncStartAt: 0,
IngestBinlogAt: 0,
}
}

Expand Down Expand Up @@ -232,6 +243,10 @@ func _convertToPersistData(persistData any) string {

// Persist is checkpint, next state only get it from persistData
func (j *JobProgress) NextSubCheckpoint(subSyncState SubSyncState, persistData any) {
if subSyncState == IngestBinlog {
j.IngestBinlogAt = time.Now().Unix()
}

j.SubSyncState = subSyncState

j.PersistData = _convertToPersistData(persistData)
Expand All @@ -251,6 +266,15 @@ func (j *JobProgress) CommitNextSubWithPersist(commitSeq int64, subSyncState Sub
}

func (j *JobProgress) NextWithPersist(commitSeq int64, syncState SyncState, subSyncState SubSyncState, persistData string) {
if subSyncState == BeginCreateSnapshot && (syncState == TableFullSync || syncState == DBFullSync) {
j.FullSyncStartAt = time.Now().Unix()
j.IncrementalSyncStartAt = 0
j.IngestBinlogAt = 0
} else if subSyncState == Done && (syncState == TableIncrementalSync || syncState == TableIncrementalSync) {
j.IncrementalSyncStartAt = time.Now().Unix()
j.IngestBinlogAt = 0
}

j.CommitSeq = commitSeq
j.SyncState = syncState
j.SubSyncState = subSyncState
Expand Down

0 comments on commit 0de343d

Please sign in to comment.