Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid restart full sync when network interrupt or process crash #75

Merged
merged 1 commit into from
May 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 34 additions & 13 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) {
tableRefs = "`" + strings.Join(tables, "`,`") + "`"
}

// means source is a empty db, table numer is 0
// means source is a empty db, table number is 0
if tableRefs == "``" {
return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
}
Expand Down Expand Up @@ -514,17 +514,21 @@ func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, error) {
}

func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) {
log.Debugf("check backup state, datebase: %s, snapshot: %s", s.Database, snapshotName)
log.Debugf("check backup state, spec: %s, snapshot: %s", s.String(), snapshotName)

for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ {
if backupState, err := s.checkBackupFinished(snapshotName); err != nil {
// Retry network related error to avoid full sync when the target network is interrupted, process is restarted.
if backupState, err := s.checkBackupFinished(snapshotName); err != nil && !isNetworkRelated(err) {
return false, err
} else if backupState == BackupStateFinished {
} else if err == nil && backupState == BackupStateFinished {
return true, nil
} else if backupState == BackupStateCancelled {
} else if err == nil && backupState == BackupStateCancelled {
return false, xerror.Errorf(xerror.Normal, "backup failed or canceled")
} else {
// BackupStatePending, BackupStateUnknown
// BackupStatePending, BackupStateUnknown or network related errors.
if err != nil {
log.Warnf("check backup state is failed, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err)
}
time.Sleep(BACKUP_CHECK_DURATION)
}
}
Expand Down Expand Up @@ -575,19 +579,23 @@ func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string,
}

func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) {
log.Debugf("check restore state is finished, spec: %s, datebase: %s, snapshot: %s", s.String(), s.Database, snapshotName)
log.Debugf("check restore state is finished, spec: %s, snapshot: %s", s.String(), snapshotName)

for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ {
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil {
// Retry network related error to avoid full sync when the target network is interrupted, process is restarted.
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil && !isNetworkRelated(err) {
return false, err
} else if restoreState == RestoreStateFinished {
} else if err == nil && restoreState == RestoreStateFinished {
return true, nil
} else if restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) {
} else if err == nil && restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) {
return false, xerror.XWrapf(ErrRestoreSignatureNotMatched, "restore failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else if restoreState == RestoreStateCancelled {
} else if err == nil && restoreState == RestoreStateCancelled {
return false, xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else {
// RestoreStatePending, RestoreStateUnknown
// RestoreStatePending, RestoreStateUnknown or network error.
if err != nil {
log.Warnf("check restore state is failed, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err)
}
time.Sleep(RESTORE_CHECK_DURATION)
}
}
Expand All @@ -597,7 +605,7 @@ func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) {
}

func (s *Spec) GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error) {
log.Debugf("get restore signature not matched table, spec: %s, datebase: %s, snapshot: %s", s.String(), s.Database, snapshotName)
log.Debugf("get restore signature not matched table, spec: %s, snapshot: %s", s.String(), snapshotName)

for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ {
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil {
Expand Down Expand Up @@ -747,3 +755,16 @@ func (s *Spec) Update(event SpecEvent) {
break
}
}

// Determine whether the error are network related, eg connection refused, connection reset, exposed from net packages.
func isNetworkRelated(err error) bool {
msg := err.Error()

// The below errors are exposed from net packages.
// See https://github.com/golang/go/issues/23827 for details.
return strings.Contains(msg, "timeout awaiting response headers") ||
strings.Contains(msg, "connection refused") ||
strings.Contains(msg, "connection reset by peer") ||
strings.Contains(msg, "connection timeouted") ||
strings.Contains(msg, "i/o timeout")
}
Loading