Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support partial snapshot to reduce fullsync overhead #125

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,48 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) {
return snapshotName, nil
}

// mysql> BACKUP SNAPSHOT ccr.snapshot_20230605 TO `__keep_on_local__` ON (src_1 PARTITION (`p1`)) PROPERTIES ("type" = "full");
func (s *Spec) CreatePartialSnapshotAndWaitForDone(table string, partitions []string) (string, error) {
if len(table) == 0 {
return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
}

if len(partitions) == 0 {
return "", xerror.Errorf(xerror.Normal, "partition is empty! you should have at least one partition")
}

// snapshot name format "ccrp_${table}_${timestamp}"
// table refs = table
snapshotName := fmt.Sprintf("ccrp_%s_%s_%d", s.Database, s.Table, time.Now().Unix())
tableRef := utils.FormatKeywordName(table)
partitionRefs := "`" + strings.Join(partitions, "`,`") + "`"

log.Infof("create partial snapshot %s.%s", s.Database, snapshotName)

db, err := s.Connect()
if err != nil {
return "", err
}

backupSnapshotSql := fmt.Sprintf("BACKUP SNAPSHOT %s.%s TO `__keep_on_local__` ON ( %s PARTITION (%s) ) PROPERTIES (\"type\" = \"full\")", utils.FormatKeywordName(s.Database), snapshotName, tableRef, partitionRefs)
log.Debugf("backup partial snapshot sql: %s", backupSnapshotSql)
_, err = db.Exec(backupSnapshotSql)
if err != nil {
return "", xerror.Wrapf(err, xerror.Normal, "backup partial snapshot %s failed, sql: %s", snapshotName, backupSnapshotSql)
}

backupFinished, err := s.CheckBackupFinished(snapshotName)
if err != nil {
return "", err
}
if !backupFinished {
err = xerror.Errorf(xerror.Normal, "check backup state timeout, max try times: %d, sql: %s", MAX_CHECK_RETRY_TIMES, backupSnapshotSql)
return "", err
}

return snapshotName, nil
}

// TODO: Add TaskErrMsg
func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, error) {
log.Debugf("check backup state of snapshot %s", snapshotName)
Expand Down
1 change: 1 addition & 0 deletions pkg/ccr/base/specer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Specer interface {
CreateTableOrView(createTable *record.CreateTable, srcDatabase string) error
CheckDatabaseExists() (bool, error)
CheckTableExists() (bool, error)
CreatePartialSnapshotAndWaitForDone(table string, partitions []string) (string, error)
CreateSnapshotAndWaitForDone(tables []string) (string, error)
CheckRestoreFinished(snapshotName string) (bool, error)
GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error)
Expand Down
265 changes: 246 additions & 19 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,194 @@ func (j *Job) isIncrementalSync() bool {
}
}

func (j *Job) addExtraInfo(jobInfo []byte) ([]byte, error) {
var jobInfoMap map[string]interface{}
err := json.Unmarshal(jobInfo, &jobInfoMap)
if err != nil {
return nil, xerror.Wrapf(err, xerror.Normal, "unmarshal jobInfo failed, jobInfo: %s", string(jobInfo))
}

extraInfo, err := j.genExtraInfo()
if err != nil {
return nil, err
}
log.Debugf("extraInfo: %v", extraInfo)
jobInfoMap["extra_info"] = extraInfo

jobInfoBytes, err := json.Marshal(jobInfoMap)
if err != nil {
return nil, xerror.Errorf(xerror.Normal, "marshal jobInfo failed, jobInfo: %v", jobInfoMap)
}

return jobInfoBytes, nil
}

// Like fullSync, but only backup and restore partial of the partitions of a table.
func (j *Job) partialSync() error {
type inMemoryData struct {
SnapshotName string `json:"snapshot_name"`
SnapshotResp *festruct.TGetSnapshotResult_ `json:"snapshot_resp"`
}

if j.progress.PartialSyncData == nil {
return xerror.Errorf(xerror.Normal, "run partial sync but data is nil")
}

table := j.progress.PartialSyncData.Table
partitions := j.progress.PartialSyncData.Partitions
switch j.progress.SubSyncState {
case Done:
log.Infof("partial sync status: done")
if err := j.newPartialSnapshot(table, partitions); err != nil {
return err
}

case BeginCreateSnapshot:
// Step 1: Create snapshot
log.Infof("partial sync status: create snapshot")
snapshotName, err := j.ISrc.CreatePartialSnapshotAndWaitForDone(table, partitions)
if err != nil {
return err
}

j.progress.NextSubCheckpoint(GetSnapshotInfo, snapshotName)

case GetSnapshotInfo:
// Step 2: Get snapshot info
log.Infof("partial sync status: get snapshot info")

snapshotName := j.progress.PersistData
src := &j.Src
srcRpc, err := j.factory.NewFeRpc(src)
if err != nil {
return err
}

log.Debugf("partial sync begin get snapshot %s", snapshotName)
snapshotResp, err := srcRpc.GetSnapshot(src, snapshotName)
if err != nil {
return err
}

if snapshotResp.Status.GetStatusCode() != tstatus.TStatusCode_OK {
err = xerror.Errorf(xerror.FE, "get snapshot failed, status: %v", snapshotResp.Status)
return err
}

if !snapshotResp.IsSetJobInfo() {
return xerror.New(xerror.Normal, "jobInfo is not set")
}

log.Tracef("job: %.128s", snapshotResp.GetJobInfo())
inMemoryData := &inMemoryData{
SnapshotName: snapshotName,
SnapshotResp: snapshotResp,
}
j.progress.NextSubVolatile(AddExtraInfo, inMemoryData)

case AddExtraInfo:
// Step 3: Add extra info
log.Infof("partial sync status: add extra info")

inMemoryData := j.progress.InMemoryData.(*inMemoryData)
snapshotResp := inMemoryData.SnapshotResp
jobInfo := snapshotResp.GetJobInfo()

log.Infof("partial sync snapshot response meta size: %d, job info size: %d",
len(snapshotResp.Meta), len(snapshotResp.JobInfo))

jobInfoBytes, err := j.addExtraInfo(jobInfo)
if err != nil {
return err
}

log.Debugf("partial sync job info size: %d, bytes: %.128s", len(jobInfoBytes), string(jobInfoBytes))
snapshotResp.SetJobInfo(jobInfoBytes)

j.progress.NextSubCheckpoint(RestoreSnapshot, inMemoryData)

case RestoreSnapshot:
// Step 4: Restore snapshot
log.Infof("partial sync status: restore snapshot")

if j.progress.InMemoryData == nil {
persistData := j.progress.PersistData
inMemoryData := &inMemoryData{}
if err := json.Unmarshal([]byte(persistData), inMemoryData); err != nil {
return xerror.Errorf(xerror.Normal, "unmarshal persistData failed, persistData: %s", persistData)
}
j.progress.InMemoryData = inMemoryData
}

// Step 4.1: start a new fullsync && persist
inMemoryData := j.progress.InMemoryData.(*inMemoryData)
snapshotName := inMemoryData.SnapshotName
restoreSnapshotName := restoreSnapshotName(snapshotName)
snapshotResp := inMemoryData.SnapshotResp

// Step 4.2: restore snapshot to dest
dest := &j.Dest
destRpc, err := j.factory.NewFeRpc(dest)
if err != nil {
return err
}
log.Debugf("partial sync begin restore snapshot %s to %s", snapshotName, restoreSnapshotName)

var tableRefs []*festruct.TTableRef
if j.SyncType == TableSync && j.Src.Table != j.Dest.Table {
log.Debugf("table sync snapshot not same name, table: %s, dest table: %s", j.Src.Table, j.Dest.Table)
tableRefs = make([]*festruct.TTableRef, 0)
tableRef := &festruct.TTableRef{
Table: &j.Src.Table,
AliasName: &j.Dest.Table,
}
tableRefs = append(tableRefs, tableRef)
}
restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp)
if err != nil {
return err
}
if restoreResp.Status.GetStatusCode() != tstatus.TStatusCode_OK {
return xerror.Errorf(xerror.Normal, "restore snapshot failed, status: %v", restoreResp.Status)
}
log.Infof("partial sync restore snapshot resp: %v", restoreResp)

for {
restoreFinished, err := j.IDest.CheckRestoreFinished(restoreSnapshotName)
if err != nil {
return err
}

if restoreFinished {
j.progress.NextSubCheckpoint(PersistRestoreInfo, restoreSnapshotName)
break
}
// retry for MAX_CHECK_RETRY_TIMES, timeout, continue
}

case PersistRestoreInfo:
// Step 5: Update job progress && dest table id
// update job info, only for dest table id
log.Infof("fullsync status: persist restore info")

switch j.SyncType {
case DBSync:
j.progress.NextWithPersist(j.progress.CommitSeq, DBTablesIncrementalSync, Done, "")
case TableSync:
j.progress.NextWithPersist(j.progress.CommitSeq, TableIncrementalSync, Done, "")
default:
return xerror.Errorf(xerror.Normal, "invalid sync type %d", j.SyncType)
}

return nil

default:
return xerror.Errorf(xerror.Normal, "invalid job sub sync state %d", j.progress.SubSyncState)
}

return j.partialSync()
}

func (j *Job) fullSync() error {
type inMemoryData struct {
SnapshotName string `json:"snapshot_name"`
Expand Down Expand Up @@ -318,7 +506,7 @@ func (j *Job) fullSync() error {
return err
}

log.Debugf("begin get snapshot %s", snapshotName)
log.Debugf("fullsync begin get snapshot %s", snapshotName)
snapshotResp, err := srcRpc.GetSnapshot(src, snapshotName)
if err != nil {
return err
Expand All @@ -329,7 +517,7 @@ func (j *Job) fullSync() error {
return err
}

log.Tracef("job: %.128s", snapshotResp.GetJobInfo())
log.Tracef("fullsync snapshot job: %.128s", snapshotResp.GetJobInfo())
if !snapshotResp.IsSetJobInfo() {
return xerror.New(xerror.Normal, "jobInfo is not set")
}
Expand Down Expand Up @@ -364,24 +552,10 @@ func (j *Job) fullSync() error {
log.Infof("snapshot response meta size: %d, job info size: %d",
len(snapshotResp.Meta), len(snapshotResp.JobInfo))

var jobInfoMap map[string]interface{}
err := json.Unmarshal(jobInfo, &jobInfoMap)
if err != nil {
return xerror.Wrapf(err, xerror.Normal, "unmarshal jobInfo failed, jobInfo: %s", string(jobInfo))
}
log.Debugf("jobInfoMap: %v", jobInfoMap)

extraInfo, err := j.genExtraInfo()
jobInfoBytes, err := j.addExtraInfo(jobInfo)
if err != nil {
return err
}
log.Debugf("extraInfo: %v", extraInfo)
jobInfoMap["extra_info"] = extraInfo

jobInfoBytes, err := json.Marshal(jobInfoMap)
if err != nil {
return xerror.Errorf(xerror.Normal, "marshal jobInfo failed, jobInfo: %v", jobInfoMap)
}
log.Debugf("job info size: %d, bytes: %s", len(jobInfoBytes), string(jobInfoBytes))
snapshotResp.SetJobInfo(jobInfoBytes)

Expand Down Expand Up @@ -1131,9 +1305,33 @@ func (j *Job) handleTruncateTable(binlog *festruct.TBinlog) error {
func (j *Job) handleReplacePartitions(binlog *festruct.TBinlog) error {
log.Infof("handle replace partitions binlog, commit seq: %d", *binlog.CommitSeq)

// TODO(walter) replace partitions once backuping/restoring with temporary partitions is supportted.
data := binlog.GetData()
replacePartition, err := record.NewReplacePartitionFromJson(data)
if err != nil {
return err
}

if !replacePartition.StrictRange {
log.Warnf("replacing partitions with non strict range is not supported yet, replace partition record: %s", string(data))
return j.newSnapshot(j.progress.CommitSeq)
}

return j.newSnapshot(j.progress.CommitSeq)
if replacePartition.UseTempName {
log.Warnf("replacing partitions with use tmp name is not supported yet, replace partition record: %s", string(data))
return j.newSnapshot(j.progress.CommitSeq)
}

oldPartitions := strings.Join(replacePartition.Partitions, ",")
newPartitions := strings.Join(replacePartition.TempPartitions, ",")
log.Infof("table %s replace partitions %s with temp partitions %s",
replacePartition.TableName, oldPartitions, newPartitions)

partitions := replacePartition.Partitions
if replacePartition.UseTempName {
partitions = replacePartition.TempPartitions
}

return j.newPartialSnapshot(replacePartition.TableName, partitions)
}

// return: error && bool backToRunLoop
Expand Down Expand Up @@ -1317,6 +1515,9 @@ func (j *Job) tableSync() error {
case TableIncrementalSync:
log.Debug("table incremental sync")
return j.incrementalSync()
case TablePartialSync:
log.Debug("table partial sync")
return j.partialSync()
default:
return xerror.Errorf(xerror.Normal, "unknown sync state: %v", j.progress.SyncState)
}
Expand Down Expand Up @@ -1346,6 +1547,9 @@ func (j *Job) dbSync() error {
case DBIncrementalSync:
log.Debug("db incremental sync")
return j.incrementalSync()
case DBPartialSync:
log.Debug("db partial sync")
return j.partialSync()
default:
return xerror.Errorf(xerror.Normal, "unknown db sync state: %v", j.progress.SyncState)
}
Expand Down Expand Up @@ -1443,6 +1647,29 @@ func (j *Job) newSnapshot(commitSeq int64) error {
}
}

func (j *Job) newPartialSnapshot(table string, partitions []string) error {
// The binlog of commitSeq will be skipped once the partial snapshot finished.
commitSeq := j.progress.CommitSeq
log.Infof("new partial snapshot, commitSeq: %d, table: %s, partitions: %v", commitSeq, table, partitions)

j.progress.PartialSyncData = &JobPartialSyncData{
Table: table,
Partitions: partitions,
}
switch j.SyncType {
case TableSync:
j.progress.NextWithPersist(commitSeq, TablePartialSync, BeginCreateSnapshot, "")
return nil
case DBSync:
j.progress.NextWithPersist(commitSeq, DBPartialSync, BeginCreateSnapshot, "")
return nil
default:
err := xerror.Panicf(xerror.Normal, "unknown table sync type: %v", j.SyncType)
log.Fatalf("run %+v", err)
return err
}
}

// run job
func (j *Job) Run() error {
gls.ResetGls(gls.GoID(), map[interface{}]interface{}{})
Expand Down
Loading
Loading