Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save key time point for ccr job progress #64

Merged
merged 1 commit into from
Apr 26, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pkg/ccr/job_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ type JobProgress struct {
TableCommitSeqMap map[int64]int64 `json:"table_commit_seq_map"` // only for DBTablesIncrementalSync
InMemoryData any `json:"-"`
PersistData string `json:"data"` // this often for binlog or snapshot info

// Some fields to save the unix epoch time of the key timepoint.
CreatedAt int64 `json:"created_at,omitempty"`
FullSyncStartAt int64 `json:"full_sync_start_at,omitempty"`
IncrementalSyncStartAt int64 `json:"incremental_sync_start_at,omitempty"`
IngestBinlogAt int64 `json:"ingest_binlog_at,omitempty"`
}

func (j *JobProgress) String() string {
Expand All @@ -168,6 +174,11 @@ func NewJobProgress(jobName string, syncType SyncType, db storage.DB) *JobProgre
TableCommitSeqMap: nil,
InMemoryData: nil,
PersistData: "",

CreatedAt: time.Now().Unix(),
FullSyncStartAt: 0,
IncrementalSyncStartAt: 0,
IngestBinlogAt: 0,
}
}

Expand Down Expand Up @@ -232,6 +243,10 @@ func _convertToPersistData(persistData any) string {

// Persist is checkpint, next state only get it from persistData
func (j *JobProgress) NextSubCheckpoint(subSyncState SubSyncState, persistData any) {
if subSyncState == IngestBinlog {
j.IngestBinlogAt = time.Now().Unix()
}

j.SubSyncState = subSyncState

j.PersistData = _convertToPersistData(persistData)
Expand All @@ -251,6 +266,15 @@ func (j *JobProgress) CommitNextSubWithPersist(commitSeq int64, subSyncState Sub
}

func (j *JobProgress) NextWithPersist(commitSeq int64, syncState SyncState, subSyncState SubSyncState, persistData string) {
if subSyncState == BeginCreateSnapshot && (syncState == TableFullSync || syncState == DBFullSync) {
j.FullSyncStartAt = time.Now().Unix()
j.IncrementalSyncStartAt = 0
j.IngestBinlogAt = 0
} else if subSyncState == Done && (syncState == TableIncrementalSync || syncState == TableIncrementalSync) {
j.IncrementalSyncStartAt = time.Now().Unix()
j.IngestBinlogAt = 0
}

j.CommitSeq = commitSeq
j.SyncState = syncState
j.SubSyncState = subSyncState
Expand Down
Loading