From 10b66afd0870f2dd047bea9c4904ad68493bd0ba Mon Sep 17 00:00:00 2001 From: w41ter Date: Thu, 26 Sep 2024 15:51:26 +0800 Subject: [PATCH] Filter shadow indexes upsert --- pkg/ccr/ingest_binlog_job.go | 11 +++++++++++ pkg/ccr/job.go | 28 ++++++++++++++++++++++++++++ pkg/ccr/job_progress.go | 4 ++++ pkg/ccr/record/alter_job_v2.go | 23 +++++++++++++++-------- 4 files changed, 58 insertions(+), 8 deletions(-) diff --git a/pkg/ccr/ingest_binlog_job.go b/pkg/ccr/ingest_binlog_job.go index 3206b710..b6fec939 100644 --- a/pkg/ccr/ingest_binlog_job.go +++ b/pkg/ccr/ingest_binlog_job.go @@ -381,6 +381,11 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partit if j.srcMeta.IsIndexDropped(indexId) { continue } + if featureFilterShadowIndexesUpsert { + if _, ok := j.ccrJob.progress.ShadowIndexes[indexId]; ok { + continue + } + } srcIndexMeta, ok := srcIndexIdMap[indexId] if !ok { j.setError(xerror.Errorf(xerror.Meta, "index id %v not found in src meta", indexId)) @@ -407,6 +412,12 @@ func (j *IngestBinlogJob) preparePartition(srcTableId, destTableId int64, partit log.Infof("skip the dropped index %d", indexId) continue } + if featureFilterShadowIndexesUpsert { + if _, ok := j.ccrJob.progress.ShadowIndexes[indexId]; ok { + log.Infof("skip the shadow index %d", indexId) + continue + } + } srcIndexMeta := srcIndexIdMap[indexId] destIndexMeta := destIndexNameMap[getSrcIndexName(job, srcIndexMeta)] diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index 7e64c4be..6b7bb990 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -41,6 +41,7 @@ var ( featureAtomicRestore bool featureCreateViewDropExists bool featureReplaceNotMatchedWithAlias bool + featureFilterShadowIndexesUpsert bool ) func init() { @@ -56,6 +57,8 @@ func init() { "drop the exists view if exists, when sync the creating view binlog") flag.BoolVar(&featureReplaceNotMatchedWithAlias, "feature_replace_not_matched_with_alias", false, "replace signature not matched tables with table alias during the full sync") + flag.BoolVar(&featureFilterShadowIndexesUpsert, "feature_filter_shadow_indexes_upsert", false, + "filter the upsert to the shadow indexes") } type SyncType int @@ -887,6 +890,7 @@ func (j *Job) fullSync() error { } j.progress.TableMapping = tableMapping + j.progress.ShadowIndexes = nil j.progress.NextWithPersist(j.progress.CommitSeq, DBTablesIncrementalSync, Done, "") case TableSync: if destTable, err := j.destMeta.UpdateTable(j.Dest.Table, 0); err != nil { @@ -901,6 +905,7 @@ func (j *Job) fullSync() error { j.progress.TableCommitSeqMap = nil j.progress.TableMapping = nil + j.progress.ShadowIndexes = nil j.progress.NextWithPersist(j.progress.CommitSeq, TableIncrementalSync, Done, "") default: return xerror.Errorf(xerror.Normal, "invalid sync type %d", j.SyncType) @@ -1459,6 +1464,24 @@ func (j *Job) handleAlterJob(binlog *festruct.TBinlog) error { } if !alterJob.IsFinished() { + switch alterJob.JobState { + case record.ALTER_JOB_STATE_PENDING: + // Once the schema change step to WAITING_TXN, the upsert to the shadow indexes is allowed, + // but the dest indexes of the downstream cluster hasn't been created. + // + // To filter the upsert to the shadow indexes, save the shadow index ids here. + if j.progress.ShadowIndexes == nil { + j.progress.ShadowIndexes = make(map[int64]int64) + } + for shadowIndexId, originIndexId := range alterJob.ShadowIndexes { + j.progress.ShadowIndexes[shadowIndexId] = originIndexId + } + case record.ALTER_JOB_STATE_CANCELLED: + // clear the shadow indexes + for shadowIndexId := range alterJob.ShadowIndexes { + delete(j.progress.ShadowIndexes, shadowIndexId) + } + } return nil } @@ -1471,6 +1494,11 @@ func (j *Job) handleAlterJob(binlog *festruct.TBinlog) error { } if featureSchemaChangePartialSync && alterJob.Type == record.ALTER_JOB_SCHEMA_CHANGE { + // Once partial snapshot finished, the shadow indexes will be convert to normal indexes. + for shadowIndexId := range alterJob.ShadowIndexes { + delete(j.progress.ShadowIndexes, shadowIndexId) + } + replaceTable := true return j.newPartialSnapshot(alterJob.TableName, nil, replaceTable) } diff --git a/pkg/ccr/job_progress.go b/pkg/ccr/job_progress.go index 91599bdc..4c64ebbc 100644 --- a/pkg/ccr/job_progress.go +++ b/pkg/ccr/job_progress.go @@ -161,6 +161,9 @@ type JobProgress struct { // The tables need to be replaced rather than dropped during sync. TableAliases map[string]string `json:"table_aliases,omitempty"` + // The shadow indexes of the pending schema changes + ShadowIndexes map[int64]int64 `json:"shadow_index_map"` + // Some fields to save the unix epoch time of the key timepoint. CreatedAt int64 `json:"created_at,omitempty"` FullSyncStartAt int64 `json:"full_sync_start_at,omitempty"` @@ -194,6 +197,7 @@ func NewJobProgress(jobName string, syncType SyncType, db storage.DB) *JobProgre PersistData: "", PartialSyncData: nil, TableAliases: nil, + ShadowIndexes: nil, CreatedAt: time.Now().Unix(), FullSyncStartAt: 0, diff --git a/pkg/ccr/record/alter_job_v2.go b/pkg/ccr/record/alter_job_v2.go index 5e827b68..64ff0fb5 100644 --- a/pkg/ccr/record/alter_job_v2.go +++ b/pkg/ccr/record/alter_job_v2.go @@ -10,16 +10,23 @@ import ( const ( ALTER_JOB_SCHEMA_CHANGE = "SCHEMA_CHANGE" ALTER_JOB_ROLLUP = "ROLLUP" + + ALTER_JOB_STATE_PENDING = "PENDING" + ALTER_JOB_STATE_WAITING_TXN = "WAITING_TXN" + ALTER_JOB_STATE_RUNNING = "RUNNING" + ALTER_JOB_STATE_FINISHED = "FINISHED" + ALTER_JOB_STATE_CANCELLED = "CANCELLED" ) type AlterJobV2 struct { - Type string `json:"type"` - DbId int64 `json:"dbId"` - TableId int64 `json:"tableId"` - TableName string `json:"tableName"` - JobId int64 `json:"jobId"` - JobState string `json:"jobState"` - RawSql string `json:"rawSql"` + Type string `json:"type"` + DbId int64 `json:"dbId"` + TableId int64 `json:"tableId"` + TableName string `json:"tableName"` + JobId int64 `json:"jobId"` + JobState string `json:"jobState"` + RawSql string `json:"rawSql"` + ShadowIndexes map[int64]int64 `json:"iim"` } func NewAlterJobV2FromJson(data string) (*AlterJobV2, error) { @@ -47,7 +54,7 @@ func NewAlterJobV2FromJson(data string) (*AlterJobV2, error) { } func (a *AlterJobV2) IsFinished() bool { - return a.JobState == "FINISHED" + return a.JobState == ALTER_JOB_STATE_FINISHED } // Stringer