From 50902877cb7caeed76f639d5a52ff6366319783c Mon Sep 17 00:00:00 2001 From: walter Date: Fri, 10 May 2024 14:36:26 +0800 Subject: [PATCH] Avoid restart full sync when network interrupt or process crash (#75) --- pkg/ccr/base/spec.go | 47 ++++++++++++++++++++++++++++++++------------ 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index 152ec30b..38fe16e9 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -448,7 +448,7 @@ func (s *Spec) CreateSnapshotAndWaitForDone(tables []string) (string, error) { tableRefs = "`" + strings.Join(tables, "`,`") + "`" } - // means source is a empty db, table numer is 0 + // means source is a empty db, table number is 0 if tableRefs == "``" { return "", xerror.Errorf(xerror.Normal, "source db is empty! you should have at least one table") } @@ -514,17 +514,21 @@ func (s *Spec) checkBackupFinished(snapshotName string) (BackupState, error) { } func (s *Spec) CheckBackupFinished(snapshotName string) (bool, error) { - log.Debugf("check backup state, datebase: %s, snapshot: %s", s.Database, snapshotName) + log.Debugf("check backup state, spec: %s, snapshot: %s", s.String(), snapshotName) for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ { - if backupState, err := s.checkBackupFinished(snapshotName); err != nil { + // Retry network related error to avoid full sync when the target network is interrupted, process is restarted. + if backupState, err := s.checkBackupFinished(snapshotName); err != nil && !isNetworkRelated(err) { return false, err - } else if backupState == BackupStateFinished { + } else if err == nil && backupState == BackupStateFinished { return true, nil - } else if backupState == BackupStateCancelled { + } else if err == nil && backupState == BackupStateCancelled { return false, xerror.Errorf(xerror.Normal, "backup failed or canceled") } else { - // BackupStatePending, BackupStateUnknown + // BackupStatePending, BackupStateUnknown or network related errors. + if err != nil { + log.Warnf("check backup state is failed, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err) + } time.Sleep(BACKUP_CHECK_DURATION) } } @@ -575,19 +579,23 @@ func (s *Spec) checkRestoreFinished(snapshotName string) (RestoreState, string, } func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) { - log.Debugf("check restore state is finished, spec: %s, datebase: %s, snapshot: %s", s.String(), s.Database, snapshotName) + log.Debugf("check restore state is finished, spec: %s, snapshot: %s", s.String(), snapshotName) for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ { - if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil { + // Retry network related error to avoid full sync when the target network is interrupted, process is restarted. + if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil && !isNetworkRelated(err) { return false, err - } else if restoreState == RestoreStateFinished { + } else if err == nil && restoreState == RestoreStateFinished { return true, nil - } else if restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) { + } else if err == nil && restoreState == RestoreStateCancelled && strings.Contains(status, SIGNATURE_NOT_MATCHED) { return false, xerror.XWrapf(ErrRestoreSignatureNotMatched, "restore failed, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status) - } else if restoreState == RestoreStateCancelled { + } else if err == nil && restoreState == RestoreStateCancelled { return false, xerror.Errorf(xerror.Normal, "restore failed or canceled, spec: %s, snapshot: %s, status: %s", s.String(), snapshotName, status) } else { - // RestoreStatePending, RestoreStateUnknown + // RestoreStatePending, RestoreStateUnknown or network error. + if err != nil { + log.Warnf("check restore state is failed, spec: %s, snapshot: %s, err: %v", s.String(), snapshotName, err) + } time.Sleep(RESTORE_CHECK_DURATION) } } @@ -597,7 +605,7 @@ func (s *Spec) CheckRestoreFinished(snapshotName string) (bool, error) { } func (s *Spec) GetRestoreSignatureNotMatchedTable(snapshotName string) (string, error) { - log.Debugf("get restore signature not matched table, spec: %s, datebase: %s, snapshot: %s", s.String(), s.Database, snapshotName) + log.Debugf("get restore signature not matched table, spec: %s, snapshot: %s", s.String(), snapshotName) for i := 0; i < MAX_CHECK_RETRY_TIMES; i++ { if restoreState, status, err := s.checkRestoreFinished(snapshotName); err != nil { @@ -747,3 +755,16 @@ func (s *Spec) Update(event SpecEvent) { break } } + +// Determine whether the error are network related, eg connection refused, connection reset, exposed from net packages. +func isNetworkRelated(err error) bool { + msg := err.Error() + + // The below errors are exposed from net packages. + // See https://github.com/golang/go/issues/23827 for details. + return strings.Contains(msg, "timeout awaiting response headers") || + strings.Contains(msg, "connection refused") || + strings.Contains(msg, "connection reset by peer") || + strings.Contains(msg, "connection timeouted") || + strings.Contains(msg, "i/o timeout") +}