Skip to content

Commit

Permalink
Avoid restart full sync when network interrupt or process crash (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
w41ter authored May 10, 2024
1 parent 9c37d92 commit 5090287
Showing 1 changed file with 34 additions and 13 deletions.
47 changes: 34 additions & 13 deletions pkg/ccr/base/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) {
tableRefs = "`" + strings.Join(tables, "`,`") + "`"
}

// means source is a empty db, table numer is 0
// means source is a empty db, table number is 0
if tableRefs == "``" {
return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table")
}
Expand Down Expand Up @@ -514,17 +514,21 @@ func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, error) {
}

func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) {
log.Debugf("check backup state, datebase: %s, snapshot: %s", s.Database, snapshotName)
log.Debugf("check backup state, spec: %s, snapshot: %s", s.String(), snapshotName)

for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ {
if backupState, err := s.checkBackupFinished(snapshotName); err != nil {
// Retry network related error to avoid full sync when the target network is interrupted, process is restarted.
if backupState, err := s.checkBackupFinished(snapshotName); err != nil && !isNetworkRelated(err) {
return false, err
} else if backupState == BackupStateFinished {
} else if err == nil && backupState == BackupStateFinished {
return true, nil
} else if backupState == BackupStateCancelled {
} else if err == nil && backupState == BackupStateCancelled {
return false, xerror.Errorf(xerror.Normal, "backup failed or canceled")
} else {
// BackupStatePending, BackupStateUnknown
// BackupStatePending, BackupStateUnknown or network related errors.
if err != nil {
log.Warnf("check backup state is failed, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err)
}
time.Sleep(BACKUP_CHECK_DURATION)
}
}
Expand Down Expand Up @@ -575,19 +579,23 @@ func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string,
}

func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) {
log.Debugf("check restore state is finished, spec: %s, datebase: %s, snapshot: %s", s.String(), s.Database, snapshotName)
log.Debugf("check restore state is finished, spec: %s, snapshot: %s", s.String(), snapshotName)

for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ {
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil {
// Retry network related error to avoid full sync when the target network is interrupted, process is restarted.
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil && !isNetworkRelated(err) {
return false, err
} else if restoreState == RestoreStateFinished {
} else if err == nil && restoreState == RestoreStateFinished {
return true, nil
} else if restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) {
} else if err == nil && restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) {
return false, xerror.XWrapf(ErrRestoreSignatureNotMatched, "restore failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else if restoreState == RestoreStateCancelled {
} else if err == nil && restoreState == RestoreStateCancelled {
return false, xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status)
} else {
// RestoreStatePending, RestoreStateUnknown
// RestoreStatePending, RestoreStateUnknown or network error.
if err != nil {
log.Warnf("check restore state is failed, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err)
}
time.Sleep(RESTORE_CHECK_DURATION)
}
}
Expand All @@ -597,7 +605,7 @@ func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) {
}

func (s *Spec) GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error) {
log.Debugf("get restore signature not matched table, spec: %s, datebase: %s, snapshot: %s", s.String(), s.Database, snapshotName)
log.Debugf("get restore signature not matched table, spec: %s, snapshot: %s", s.String(), snapshotName)

for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ {
if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil {
Expand Down Expand Up @@ -747,3 +755,16 @@ func (s *Spec) Update(event SpecEvent) {
break
}
}

// Determine whether the error are network related, eg connection refused, connection reset, exposed from net packages.
func isNetworkRelated(err error) bool {
msg := err.Error()

// The below errors are exposed from net packages.
// See https://github.com/golang/go/issues/23827 for details.
return strings.Contains(msg, "timeout awaiting response headers") ||
strings.Contains(msg, "connection refused") ||
strings.Contains(msg, "connection reset by peer") ||
strings.Contains(msg, "connection timeouted") ||
strings.Contains(msg, "i/o timeout")
}

0 comments on commit 5090287

Please sign in to comment.