Skip to content

Commit

Permalink
Restore snapshot with clean_restore during fullsync (#128)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter committed Aug 28, 2024
1 parent 1113dd3 commit f7c2783
Show file tree
Hide file tree
Showing 19 changed files with 4,277 additions and 233 deletions.
11 changes: 9 additions & 2 deletions pkg/ccr/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@ func (j *Job) partialSync() error {
}
tableRefs = append(tableRefs, tableRef)
}
restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp)
cleanPartitions, cleanTables := false, false // DO NOT drop exists tables and partitions
restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp, cleanTables, cleanPartitions)
if err != nil {
return err
}
Expand Down Expand Up @@ -609,7 +610,13 @@ func (j *Job) fullSync() error {
}
tableRefs = append(tableRefs, tableRef)
}
restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp)

// drop exists partitions, and drop tables if in db sync.
cleanTables, cleanPartitions := false, true
if j.SyncType == DBSync {
cleanTables = true
}
restoreResp, err := destRpc.RestoreSnapshot(dest, tableRefs, restoreSnapshotName, snapshotResp, cleanTables, cleanPartitions)
if err != nil {
return err
}
Expand Down
28 changes: 15 additions & 13 deletions pkg/rpc/fe.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ type IFeRpc interface {
GetBinlog(*base.Spec, int64) (*festruct.TGetBinlogResult_, error)
GetBinlogLag(*base.Spec, int64) (*festruct.TGetBinlogLagResult_, error)
GetSnapshot(*base.Spec, string) (*festruct.TGetSnapshotResult_, error)
RestoreSnapshot(*base.Spec, []*festruct.TTableRef, string, *festruct.TGetSnapshotResult_) (*festruct.TRestoreSnapshotResult_, error)
RestoreSnapshot(*base.Spec, []*festruct.TTableRef, string, *festruct.TGetSnapshotResult_, bool, bool) (*festruct.TRestoreSnapshotResult_, error)
GetMasterToken(*base.Spec) (*festruct.TGetMasterTokenResult_, error)
GetDbMeta(spec *base.Spec) (*festruct.TGetMetaResult_, error)
GetTableMeta(spec *base.Spec, tableIds []int64) (*festruct.TGetMetaResult_, error)
Expand Down Expand Up @@ -384,10 +384,10 @@ func (rpc *FeRpc) GetSnapshot(spec *base.Spec, labelName string) (*festruct.TGet
return convertResult[festruct.TGetSnapshotResult_](result, err)
}

func (rpc *FeRpc) RestoreSnapshot(spec *base.Spec, tableRefs []*festruct.TTableRef, label string, snapshotResult *festruct.TGetSnapshotResult_) (*festruct.TRestoreSnapshotResult_, error) {
func (rpc *FeRpc) RestoreSnapshot(spec *base.Spec, tableRefs []*festruct.TTableRef, label string, snapshotResult *festruct.TGetSnapshotResult_, cleanTables bool, cleanPartitions bool) (*festruct.TRestoreSnapshotResult_, error) {
// return rpc.masterClient.RestoreSnapshot(spec, tableRefs, label, snapshotResult)
caller := func(client IFeRpc) (resultType, error) {
return client.RestoreSnapshot(spec, tableRefs, label, snapshotResult)
return client.RestoreSnapshot(spec, tableRefs, label, snapshotResult, cleanTables, cleanPartitions)
}
result, err := rpc.callWithMasterRedirect(caller)
return convertResult[festruct.TRestoreSnapshotResult_](result, err)
Expand Down Expand Up @@ -664,7 +664,7 @@ func (rpc *singleFeClient) GetSnapshot(spec *base.Spec, labelName string) (*fest
// }
//
// Restore Snapshot rpc
func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, tableRefs []*festruct.TTableRef, label string, snapshotResult *festruct.TGetSnapshotResult_) (*festruct.TRestoreSnapshotResult_, error) {
func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, tableRefs []*festruct.TTableRef, label string, snapshotResult *festruct.TGetSnapshotResult_, cleanTables bool, cleanPartitions bool) (*festruct.TRestoreSnapshotResult_, error) {
// NOTE: ignore meta, because it's too large
log.Debugf("Call RestoreSnapshot, addr: %s, spec: %s", rpc.Address(), spec)

Expand All @@ -673,19 +673,21 @@ func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, tableRefs []*festruc
properties := make(map[string]string)
properties["reserve_replica"] = "true"
req := &festruct.TRestoreSnapshotRequest{
Table: &spec.Table,
LabelName: &label,
RepoName: &repoName,
TableRefs: tableRefs,
Properties: properties,
Meta: snapshotResult.GetMeta(),
JobInfo: snapshotResult.GetJobInfo(),
Table: &spec.Table,
LabelName: &label,
RepoName: &repoName,
TableRefs: tableRefs,
Properties: properties,
Meta: snapshotResult.GetMeta(),
JobInfo: snapshotResult.GetJobInfo(),
CleanTables: &cleanTables,
CleanPartitions: &cleanPartitions,
}
setAuthInfo(req, spec)

// NOTE: ignore meta, because it's too large
log.Debugf("RestoreSnapshotRequest user %s, db %s, table %s, label name %s, properties %v",
req.GetUser(), req.GetDb(), req.GetTable(), req.GetLabelName(), properties)
log.Debugf("RestoreSnapshotRequest user %s, db %s, table %s, label name %s, properties %v, clean tables: %v, clean partitions: %v",
req.GetUser(), req.GetDb(), req.GetTable(), req.GetLabelName(), properties, cleanTables, cleanPartitions)
if resp, err := client.RestoreSnapshot(context.Background(), req); err != nil {
return nil, xerror.Wrapf(err, xerror.RPC, "RestoreSnapshot failed")
} else {
Expand Down
225 changes: 225 additions & 0 deletions pkg/rpc/kitex_gen/backendservice/BackendService.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f7c2783

Please sign in to comment.