Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support table alias instead of drop during fullsync #179

Merged
merged 2 commits into from
Sep 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 62 additions & 4 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ const (
)

var (
featureSchemaChangePartialSync bool
featureCleanTableAndPartitions bool
featureAtomicRestore bool
featureCreateViewDropExists bool
featureSchemaChangePartialSync bool
featureCleanTableAndPartitions bool
featureAtomicRestore bool
featureCreateViewDropExists bool
featureReplaceNotMatchedWithAlias bool
)

func init() {
Expand All @@ -53,6 +54,8 @@ func init() {
"replace tables in atomic during fullsync (otherwise the dest table will not be able to read).")
flag.BoolVar(&featureCreateViewDropExists, "feature_create_view_drop_exists", true,
"drop the exists view if exists, when sync the creating view binlog")
flag.BoolVar(&featureReplaceNotMatchedWithAlias, "feature_replace_not_matched_with_alias", false,
"replace signature not matched tables with table alias during the full sync")
}

type SyncType int
Expand Down Expand Up @@ -701,6 +704,17 @@ func (j *Job) fullSync() error {
}
tableRefs = append(tableRefs, tableRef)
}
if len(j.progress.TableAliases) > 0 {
tableRefs = make([]*festruct.TTableRef, 0)
for table, alias := range j.progress.TableAliases {
log.Debugf("fullsync alias table from %s to %s", table, alias)
tableRef := &festruct.TTableRef{
Table: &table,
AliasName: &alias,
}
tableRefs = append(tableRefs, tableRef)
}
}

restoreReq := rpc.RestoreSnapshotRequest{
TableRefs: tableRefs,
Expand Down Expand Up @@ -749,6 +763,14 @@ func (j *Job) fullSync() error {
resource = "view"
}
log.Infof("the signature of %s %s is not matched with the target table in snapshot", resource, tableName)
if tableOrView && featureReplaceNotMatchedWithAlias {
if j.progress.TableAliases == nil {
j.progress.TableAliases = make(map[string]string)
}
j.progress.TableAliases[tableName] = tableAlias(tableName)
j.progress.CommitNextSubWithPersist(j.progress.CommitSeq, RestoreSnapshot, inMemoryData)
break
}
for {
if tableOrView {
if err := j.IDest.DropTable(tableName, false); err == nil {
Expand Down Expand Up @@ -776,6 +798,42 @@ func (j *Job) fullSync() error {
case PersistRestoreInfo:
// Step 5: Update job progress && dest table id
// update job info, only for dest table id

if len(j.progress.TableAliases) > 0 {
log.Infof("fullsync swap %d tables with aliases", len(j.progress.TableAliases))

var tables []string
for table := range j.progress.TableAliases {
tables = append(tables, table)
}
for _, table := range tables {
alias := j.progress.TableAliases[table]
targetName := table
if j.isTableSyncWithAlias() {
targetName = j.Dest.Table
}

// check table exists to ensure the idempotent
if exist, err := j.IDest.CheckTableExistsByName(alias); err != nil {
return err
} else if exist {
log.Infof("fullsync swap table with alias, table: %s, alias: %s", targetName, alias)
swap := false // drop the old table
if err := j.IDest.ReplaceTable(alias, targetName, swap); err != nil {
return err
}
} else {
log.Infof("fullsync the table alias has been swapped, table: %s, alias: %s", targetName, alias)
}
}
// Since the meta of dest table has been changed, refresh it.
j.destMeta.ClearTablesCache()

// Save the replace result
j.progress.TableAliases = nil
j.progress.NextSubCheckpoint(PersistRestoreInfo, j.progress.PersistData)
}

log.Infof("fullsync status: persist restore info")

switch j.SyncType {
Expand Down
Loading