From d0246be0d618942d25fe487be91d0bb0045b6539 Mon Sep 17 00:00:00 2001 From: w41ter Date: Tue, 24 Sep 2024 11:50:52 +0800 Subject: [PATCH 1/2] Support table alias instead of drop during fullsync The table has different signature between upstream and downstream will be dropped in former implementation, might causes downstream read requests failed. This PR will alias the conflict tables and replace them after restore finished. --- pkg/ccr/job.go | 66 +++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 62 insertions(+), 4 deletions(-) diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index 9faf13d9..4909f2de 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -36,10 +36,11 @@ const ( ) var ( - featureSchemaChangePartialSync bool - featureCleanTableAndPartitions bool - featureAtomicRestore bool - featureCreateViewDropExists bool + featureSchemaChangePartialSync bool + featureCleanTableAndPartitions bool + featureAtomicRestore bool + featureCreateViewDropExists bool + featureReplaceNotMatchedWithAlias bool ) func init() { @@ -53,6 +54,8 @@ func init() { "replace tables in atomic during fullsync (otherwise the dest table will not be able to read).") flag.BoolVar(&featureCreateViewDropExists, "feature_create_view_drop_exists", true, "drop the exists view if exists, when sync the creating view binlog") + flag.BoolVar(&featureReplaceNotMatchedWithAlias, "feature_replace_not_matched_with_alias", false, + "replace signature not matched tables with table alias during the full sync") } type SyncType int @@ -701,6 +704,17 @@ func (j *Job) fullSync() error { } tableRefs = append(tableRefs, tableRef) } + if len(j.progress.TableAliases) > 0 { + tableRefs = make([]*festruct.TTableRef, 0) + for table, alias := range j.progress.TableAliases { + log.Debugf("fullsync alias table from %s to %s", table, alias) + tableRef := &festruct.TTableRef{ + Table: &table, + AliasName: &alias, + } + tableRefs = append(tableRefs, tableRef) + } + } restoreReq := rpc.RestoreSnapshotRequest{ TableRefs: tableRefs, @@ -749,6 +763,14 @@ func (j *Job) fullSync() error { resource = "view" } log.Infof("the signature of %s %s is not matched with the target table in snapshot", resource, tableName) + if tableOrView && featureReplaceNotMatchedWithAlias { + if j.progress.TableAliases == nil { + j.progress.TableAliases = make(map[string]string) + } + j.progress.TableAliases[tableName] = tableAlias(tableName) + j.progress.CommitNextSubWithPersist(j.progress.CommitSeq, RestoreSnapshot, inMemoryData) + break + } for { if tableOrView { if err := j.IDest.DropTable(tableName, false); err == nil { @@ -776,6 +798,42 @@ func (j *Job) fullSync() error { case PersistRestoreInfo: // Step 5: Update job progress && dest table id // update job info, only for dest table id + + if len(j.progress.TableAliases) > 0 { + log.Infof("fullsync swap %d tables with aliases", len(j.progress.TableAliases)) + + var tables []string + for table, _ := range j.progress.TableAliases { + tables = append(tables, table) + } + for _, table := range tables { + alias := j.progress.TableAliases[table] + targetName := table + if j.isTableSyncWithAlias() { + targetName = j.Dest.Table + } + + // check table exists to ensure the idempotent + if exist, err := j.IDest.CheckTableExistsByName(alias); err != nil { + return err + } else if exist { + log.Infof("fullsync swap table with alias, table: %s, alias: %s", targetName, alias) + swap := false // drop the old table + if err := j.IDest.ReplaceTable(alias, targetName, swap); err != nil { + return err + } + } else { + log.Infof("fullsync the table alias has been swapped, table: %s, alias: %s", targetName, alias) + } + } + // Since the meta of dest table has been changed, refresh it. + j.destMeta.ClearTablesCache() + + // Save the replace result + j.progress.TableAliases = nil + j.progress.NextSubCheckpoint(PersistRestoreInfo, j.progress.PersistData) + } + log.Infof("fullsync status: persist restore info") switch j.SyncType { From dfdfe75d12b93b2cfee3fd231ff8f66dfbfb43ac Mon Sep 17 00:00:00 2001 From: w41ter Date: Tue, 24 Sep 2024 11:57:16 +0800 Subject: [PATCH 2/2] fixup --- pkg/ccr/job.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index 4909f2de..1c39ce96 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -803,7 +803,7 @@ func (j *Job) fullSync() error { log.Infof("fullsync swap %d tables with aliases", len(j.progress.TableAliases)) var tables []string - for table, _ := range j.progress.TableAliases { + for table := range j.progress.TableAliases { tables = append(tables, table) } for _, table := range tables {