From 91afd852036d5a7088efe9e5aa080be282610e64 Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Thu, 5 Dec 2024 00:49:31 +0000 Subject: [PATCH] sync2: multipeer: fix edge cases (#6447) ------- ## Motivation Split sync could become blocked when there were slow peers. Their subranges are assigned to other peers, and there were bugs causing indefinite blocking and panics in these cases. Moreover, after other peers managed to sync the slow peers' subranges ahead of them, we need to interrupt syncing against the slow peers as it's no longer needed. In multipeer sync, when every peer has failed to sync, e.g. due to temporary connection interruption, we don't need to wait for the full sync interval, using shorter wait time between retries. --- sql/database.go | 3 +- sync2/multipeer/multipeer.go | 30 ++++++- sync2/multipeer/multipeer_test.go | 64 ++++++++++++- sync2/multipeer/split_sync.go | 30 ++++++- sync2/multipeer/split_sync_test.go | 139 ++++++++++++++++++++--------- sync2/p2p.go | 106 ++++++++++++++-------- sync2/p2p_test.go | 4 +- 7 files changed, 285 insertions(+), 91 deletions(-) diff --git a/sql/database.go b/sql/database.go index 0f593ab9c9..90931ff611 100644 --- a/sql/database.go +++ b/sql/database.go @@ -227,7 +227,8 @@ func OpenInMemory(opts ...Opt) (*sqliteDatabase, error) { opts = append(opts, withForceFresh()) // Unique uri is needed to avoid sharing the same in-memory database, // while allowing multiple connections to the same database. - uri := fmt.Sprintf("file:mem-%d?mode=memory&cache=shared", rand.Uint64()) + uri := fmt.Sprintf("file:mem-%d-%d?mode=memory&cache=shared", + rand.Uint64(), rand.Uint64()) return Open(uri, opts...) } diff --git a/sync2/multipeer/multipeer.go b/sync2/multipeer/multipeer.go index 3b88bcf6ec..00832edc38 100644 --- a/sync2/multipeer/multipeer.go +++ b/sync2/multipeer/multipeer.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" "math" + "math/rand/v2" + "sync/atomic" "time" "github.com/jonboulle/clockwork" @@ -69,6 +71,9 @@ type MultiPeerReconcilerConfig struct { MinCompleteFraction float64 `mapstructure:"min-complete-fraction"` // Interval between syncs. SyncInterval time.Duration `mapstructure:"sync-interval"` + // Interval spread factor for split sync. + // The actual interval will be SyncInterval * (1 + (random[0..2]*SplitSyncIntervalSpread-1)). + SyncIntervalSpread float64 `mapstructure:"sync-interval-spread"` // Interval between retries after a failed sync. RetryInterval time.Duration `mapstructure:"retry-interval"` // Interval between rechecking for peers after no synchronization peers were @@ -96,6 +101,7 @@ func DefaultConfig() MultiPeerReconcilerConfig { MaxFullDiff: 10000, MaxSyncDiff: 100, SyncInterval: 5 * time.Minute, + SyncIntervalSpread: 0.5, RetryInterval: 1 * time.Minute, NoPeersRecheckInterval: 30 * time.Second, SplitSyncGracePeriod: time.Minute, @@ -264,20 +270,27 @@ func (mpr *MultiPeerReconciler) needSplitSync(s syncability) bool { } func (mpr *MultiPeerReconciler) fullSync(ctx context.Context, syncPeers []p2p.Peer) error { + if len(syncPeers) == 0 { + return errors.New("no peers to sync against") + } var eg errgroup.Group + var someSucceeded atomic.Bool for _, p := range syncPeers { eg.Go(func() error { if err := mpr.syncBase.WithPeerSyncer(ctx, p, func(ps PeerSyncer) error { err := ps.Sync(ctx, nil, nil) switch { case err == nil: + someSucceeded.Store(true) mpr.sl.NoteSync() case errors.Is(err, context.Canceled): return err default: // failing to sync against a particular peer is not considered // a fatal sync failure, so we just log the error - mpr.logger.Error("error syncing peer", zap.Stringer("peer", p), zap.Error(err)) + mpr.logger.Error("error syncing peer", + zap.Stringer("peer", p), + zap.Error(err)) } return nil }); err != nil { @@ -286,7 +299,13 @@ func (mpr *MultiPeerReconciler) fullSync(ctx context.Context, syncPeers []p2p.Pe return nil }) } - return eg.Wait() + if err := eg.Wait(); err != nil { + return err + } + if !someSucceeded.Load() { + return errors.New("all syncs failed") + } + return nil } func (mpr *MultiPeerReconciler) syncOnce(ctx context.Context, lastWasSplit bool) (full bool, err error) { @@ -346,7 +365,7 @@ func (mpr *MultiPeerReconciler) syncOnce(ctx context.Context, lastWasSplit bool) } // Run runs the MultiPeerReconciler. -func (mpr *MultiPeerReconciler) Run(ctx context.Context) error { +func (mpr *MultiPeerReconciler) Run(ctx context.Context, kickCh chan struct{}) error { // The point of using split sync, which syncs different key ranges against // different peers, vs full sync which syncs the full key range against different // peers, is: @@ -384,7 +403,9 @@ func (mpr *MultiPeerReconciler) Run(ctx context.Context) error { lastWasSplit := false LOOP: for { - interval := mpr.cfg.SyncInterval + interval := time.Duration( + float64(mpr.cfg.SyncInterval) * + (1 + mpr.cfg.SyncIntervalSpread*(rand.Float64()*2-1))) full, err = mpr.syncOnce(ctx, lastWasSplit) if err != nil { if errors.Is(err, context.Canceled) { @@ -407,6 +428,7 @@ LOOP: err = ctx.Err() break LOOP case <-mpr.clock.After(interval): + case <-kickCh: } } // The loop is only exited upon context cancellation. diff --git a/sync2/multipeer/multipeer_test.go b/sync2/multipeer/multipeer_test.go index dc3487424d..4bd05227fe 100644 --- a/sync2/multipeer/multipeer_test.go +++ b/sync2/multipeer/multipeer_test.go @@ -67,6 +67,7 @@ type multiPeerSyncTester struct { reconciler *multipeer.MultiPeerReconciler cancel context.CancelFunc eg errgroup.Group + kickCh chan struct{} // EXPECT() calls should not be done concurrently // https://github.com/golang/mock/issues/533#issuecomment-821537840 mtx sync.Mutex @@ -81,10 +82,13 @@ func newMultiPeerSyncTester(t *testing.T, addPeers int) *multiPeerSyncTester { syncRunner: NewMocksyncRunner(ctrl), peers: peers.New(), clock: clockwork.NewFakeClock().(fakeClock), + kickCh: make(chan struct{}, 1), } cfg := multipeer.DefaultConfig() - cfg.SyncInterval = time.Minute + cfg.SyncInterval = 40 * time.Second + cfg.SyncIntervalSpread = 0.1 cfg.SyncPeerCount = numSyncPeers + cfg.RetryInterval = 5 * time.Second cfg.MinSplitSyncPeers = 2 cfg.MinSplitSyncCount = 90 cfg.MaxFullDiff = 20 @@ -111,7 +115,7 @@ func (mt *multiPeerSyncTester) addPeers(n int) []p2p.Peer { func (mt *multiPeerSyncTester) start() context.Context { var ctx context.Context ctx, mt.cancel = context.WithTimeout(context.Background(), 10*time.Second) - mt.eg.Go(func() error { return mt.reconciler.Run(ctx) }) + mt.eg.Go(func() error { return mt.reconciler.Run(ctx, mt.kickCh) }) mt.Cleanup(func() { mt.cancel() if err := mt.eg.Wait(); err != nil { @@ -121,6 +125,10 @@ func (mt *multiPeerSyncTester) start() context.Context { return ctx } +func (mt *multiPeerSyncTester) kick() { + mt.kickCh <- struct{}{} +} + func (mt *multiPeerSyncTester) expectProbe(times int, pr rangesync.ProbeResult) *peerList { var pl peerList mt.syncBase.EXPECT().Probe(gomock.Any(), gomock.Any()).DoAndReturn( @@ -183,7 +191,7 @@ func TestMultiPeerSync(t *testing.T) { mt := newMultiPeerSyncTester(t, 0) ctx := mt.start() mt.clock.BlockUntilContext(ctx, 1) - // Advance by sync interval. No peers yet + // Advance by sync interval (incl. spread). No peers yet mt.clock.Advance(time.Minute) mt.clock.BlockUntilContext(ctx, 1) // It is safe to do EXPECT() calls while the MultiPeerReconciler is blocked @@ -249,6 +257,34 @@ func TestMultiPeerSync(t *testing.T) { mt.syncBase.EXPECT().Wait() }) + t.Run("sync after kick", func(t *testing.T) { + mt := newMultiPeerSyncTester(t, 10) + mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes() + require.False(t, mt.reconciler.Synced()) + expect := func() { + pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{ + FP: "foo", + Count: 100, + Sim: 0.99, // high enough for full sync + }) + mt.expectFullSync(pl, numSyncPeers, 0) + mt.syncBase.EXPECT().Wait() + } + expect() + // first full sync happens immediately + ctx := mt.start() + mt.clock.BlockUntilContext(ctx, 1) + mt.satisfy() + for i := 0; i < numSyncs; i++ { + expect() + mt.kick() + mt.clock.BlockUntilContext(ctx, 1) + mt.satisfy() + } + require.True(t, mt.reconciler.Synced()) + mt.syncBase.EXPECT().Wait() + }) + t.Run("full sync, peers with low count ignored", func(t *testing.T) { mt := newMultiPeerSyncTester(t, 0) addedPeers := mt.addPeers(numSyncPeers) @@ -349,6 +385,28 @@ func TestMultiPeerSync(t *testing.T) { mt.syncBase.EXPECT().Wait() }) + t.Run("all peers failed during full sync", func(t *testing.T) { + mt := newMultiPeerSyncTester(t, 10) + mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes() + + pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99}) + mt.expectFullSync(pl, numSyncPeers, numSyncPeers) + mt.syncBase.EXPECT().Wait().AnyTimes() + + ctx := mt.start() + mt.clock.BlockUntilContext(ctx, 1) + mt.satisfy() + + pl = mt.expectProbe(numSyncPeers, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99}) + mt.expectFullSync(pl, numSyncPeers, 0) + // Retry should happen after mere 5 seconds as no peers have succeeded, no + // need to wait full sync interval. + mt.clock.Advance(5 * time.Second) + mt.satisfy() + + require.True(t, mt.reconciler.Synced()) + }) + t.Run("failed synced key handling during full sync", func(t *testing.T) { mt := newMultiPeerSyncTester(t, 10) mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes() diff --git a/sync2/multipeer/split_sync.go b/sync2/multipeer/split_sync.go index 6d69293451..9ea1bd437f 100644 --- a/sync2/multipeer/split_sync.go +++ b/sync2/multipeer/split_sync.go @@ -12,6 +12,7 @@ import ( "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/fetch/peers" + "github.com/spacemeshos/go-spacemesh/log" "github.com/spacemeshos/go-spacemesh/p2p" ) @@ -150,6 +151,8 @@ func (s *splitSync) handleSyncResult(r syncResult) error { s.syncPeers = append(s.syncPeers, r.ps.Peer()) s.numRemaining-- s.logger.Debug("peer synced successfully", + log.ZShortStringer("x", sr.X), + log.ZShortStringer("y", sr.Y), zap.Stringer("peer", r.ps.Peer()), zap.Int("numPeers", s.numPeers), zap.Int("numRemaining", s.numRemaining), @@ -199,8 +202,18 @@ func (s *splitSync) Sync(ctx context.Context) error { } select { case sr = <-s.slowRangeCh: - // push this syncRange to the back of the queue - s.sq.Update(sr, s.clock.Now()) + // Push this syncRange to the back of the queue. + // There's some chance that the peer managed to complete + // the sync while the range was still sitting in the + // channel, so we double-check if it's done. + if !sr.Done { + s.logger.Debug("slow peer, reassigning the range", + log.ZShortStringer("x", sr.X), log.ZShortStringer("y", sr.Y)) + s.sq.Update(sr, s.clock.Now()) + } else { + s.logger.Debug("slow peer, NOT reassigning the range: DONE", + log.ZShortStringer("x", sr.X), log.ZShortStringer("y", sr.Y)) + } case <-syncCtx.Done(): return syncCtx.Err() case r := <-s.resCh: @@ -210,5 +223,16 @@ func (s *splitSync) Sync(ctx context.Context) error { } } } - return s.eg.Wait() + // Stop late peers that didn't manage to sync their ranges in time. + // The ranges were already reassigned to other peers and successfully + // synced by this point. + cancel() + err := s.eg.Wait() + if s.numRemaining == 0 { + // If all the ranges are synced, the split sync is considered successful + // even if some peers failed to sync their ranges, so that these ranges + // got synced by other peers. + return nil + } + return err } diff --git a/sync2/multipeer/split_sync_test.go b/sync2/multipeer/split_sync_test.go index d81e64c424..4dbfdb8980 100644 --- a/sync2/multipeer/split_sync_test.go +++ b/sync2/multipeer/split_sync_test.go @@ -3,6 +3,7 @@ package multipeer_test import ( "context" "errors" + "fmt" "sync" "testing" "time" @@ -14,7 +15,6 @@ import ( "go.uber.org/zap/zaptest" "golang.org/x/sync/errgroup" - "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/fetch/peers" "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/sync2/multipeer" @@ -23,9 +23,9 @@ import ( type splitSyncTester struct { testing.TB - + ctrl *gomock.Controller syncPeers []p2p.Peer - clock clockwork.Clock + clock clockwork.FakeClock mtx sync.Mutex fail map[hexRange]bool expPeerRanges map[hexRange]int @@ -57,6 +57,8 @@ var tstRanges = []hexRange{ func newTestSplitSync(t testing.TB) *splitSyncTester { ctrl := gomock.NewController(t) tst := &splitSyncTester{ + TB: t, + ctrl: ctrl, syncPeers: make([]p2p.Peer, 4), clock: clockwork.NewFakeClock(), fail: make(map[hexRange]bool), @@ -71,45 +73,7 @@ func newTestSplitSync(t testing.TB) *splitSyncTester { peers: peers.New(), } for n := range tst.syncPeers { - tst.syncPeers[n] = p2p.Peer(types.RandomBytes(20)) - } - for index, p := range tst.syncPeers { - tst.syncBase.EXPECT(). - WithPeerSyncer(gomock.Any(), p, gomock.Any()). - DoAndReturn(func( - _ context.Context, - peer p2p.Peer, - toCall func(multipeer.PeerSyncer) error, - ) error { - s := NewMockPeerSyncer(ctrl) - s.EXPECT().Peer().Return(p).AnyTimes() - // TODO: do better job at tracking Release() calls - s.EXPECT(). - Sync(gomock.Any(), gomock.Any(), gomock.Any()). - DoAndReturn(func(_ context.Context, x, y rangesync.KeyBytes) error { - tst.mtx.Lock() - defer tst.mtx.Unlock() - require.NotNil(t, x) - require.NotNil(t, y) - k := hexRange{x.String(), y.String()} - tst.peerRanges[k] = append(tst.peerRanges[k], peer) - count, found := tst.expPeerRanges[k] - require.True(t, found, "peer range not found: x %s y %s", x, y) - if tst.fail[k] { - t.Logf("ERR: peer %d x %s y %s", - index, x.String(), y.String()) - tst.fail[k] = false - return errors.New("injected fault") - } else { - t.Logf("OK: peer %d x %s y %s", - index, x.String(), y.String()) - tst.expPeerRanges[k] = count + 1 - } - return nil - }) - return toCall(s) - }). - AnyTimes() + tst.syncPeers[n] = p2p.Peer(fmt.Sprintf("peer%d", n)) } for _, p := range tst.syncPeers { tst.peers.Add(p, func() []protocol.ID { return []protocol.ID{multipeer.Protocol} }) @@ -126,8 +90,48 @@ func newTestSplitSync(t testing.TB) *splitSyncTester { return tst } +func (tst *splitSyncTester) expectPeerSync(p p2p.Peer) { + tst.syncBase.EXPECT(). + WithPeerSyncer(gomock.Any(), p, gomock.Any()). + DoAndReturn(func( + _ context.Context, + peer p2p.Peer, + toCall func(multipeer.PeerSyncer) error, + ) error { + s := NewMockPeerSyncer(tst.ctrl) + s.EXPECT().Peer().Return(p).AnyTimes() + s.EXPECT(). + Sync(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, x, y rangesync.KeyBytes) error { + tst.mtx.Lock() + defer tst.mtx.Unlock() + require.NotNil(tst, x) + require.NotNil(tst, y) + k := hexRange{x.String(), y.String()} + tst.peerRanges[k] = append(tst.peerRanges[k], peer) + count, found := tst.expPeerRanges[k] + require.True(tst, found, "peer range not found: x %s y %s", x, y) + if tst.fail[k] { + tst.Logf("ERR: peer %s x %s y %s", + string(p), x.String(), y.String()) + tst.fail[k] = false + return errors.New("injected fault") + } else { + tst.Logf("OK: peer %s x %s y %s", + string(p), x.String(), y.String()) + tst.expPeerRanges[k] = count + 1 + } + return nil + }) + return toCall(s) + }).AnyTimes() +} + func TestSplitSync(t *testing.T) { tst := newTestSplitSync(t) + for _, p := range tst.syncPeers { + tst.expectPeerSync(p) + } var eg errgroup.Group eg.Go(func() error { return tst.splitSync.Sync(context.Background()) @@ -138,8 +142,11 @@ func TestSplitSync(t *testing.T) { } } -func TestSplitSyncRetry(t *testing.T) { +func TestSplitSync_Retry(t *testing.T) { tst := newTestSplitSync(t) + for _, p := range tst.syncPeers { + tst.expectPeerSync(p) + } tst.fail[tstRanges[1]] = true tst.fail[tstRanges[2]] = true var eg errgroup.Group @@ -152,3 +159,49 @@ func TestSplitSyncRetry(t *testing.T) { require.Equal(t, 1, count, "peer range not synced: x %s y %s", pr[0], pr[1]) } } + +func TestSplitSync_SlowPeers(t *testing.T) { + tst := newTestSplitSync(t) + + for _, p := range tst.syncPeers[:2] { + tst.expectPeerSync(p) + } + + for _, p := range tst.syncPeers[2:] { + tst.syncBase.EXPECT(). + WithPeerSyncer(gomock.Any(), p, gomock.Any()). + DoAndReturn(func( + _ context.Context, + peer p2p.Peer, + toCall func(multipeer.PeerSyncer) error, + ) error { + s := NewMockPeerSyncer(tst.ctrl) + s.EXPECT().Peer().Return(p).AnyTimes() + s.EXPECT(). + Sync(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, x, y rangesync.KeyBytes) error { + <-ctx.Done() + return nil + }) + return toCall(s) + }) + } + + var eg errgroup.Group + eg.Go(func() error { + return tst.splitSync.Sync(context.Background()) + }) + + require.Eventually(t, func() bool { + tst.mtx.Lock() + defer tst.mtx.Unlock() + return len(tst.peerRanges) == 2 + }, 10*time.Millisecond, time.Millisecond) + // Make sure all 4 grace period timers are started. + tst.clock.BlockUntil(4) + tst.clock.Advance(time.Minute) + require.NoError(t, eg.Wait()) + for pr, count := range tst.expPeerRanges { + require.Equal(t, 1, count, "bad sync count: x %s y %s", pr[0], pr[1]) + } +} diff --git a/sync2/p2p.go b/sync2/p2p.go index ba3d098eef..e43155155b 100644 --- a/sync2/p2p.go +++ b/sync2/p2p.go @@ -22,9 +22,13 @@ import ( type Config struct { rangesync.RangeSetReconcilerConfig `mapstructure:",squash"` multipeer.MultiPeerReconcilerConfig `mapstructure:",squash"` - EnableActiveSync bool `mapstructure:"enable-active-sync"` - TrafficLimit int `mapstructure:"traffic-limit"` - MessageLimit int `mapstructure:"message-limit"` + TrafficLimit int `mapstructure:"traffic-limit"` + MessageLimit int `mapstructure:"message-limit"` + MaxDepth int `mapstructure:"max-depth"` + BatchSize int `mapstructure:"batch-size"` + MaxAttempts int `mapstructure:"max-attempts"` + MaxBatchRetries int `mapstructure:"max-batch-retries"` + FailedBatchDelay time.Duration `mapstructure:"failed-batch-delay"` } // DefaultConfig returns the default configuration for the P2PHashSync. @@ -34,20 +38,27 @@ func DefaultConfig() Config { MultiPeerReconcilerConfig: multipeer.DefaultConfig(), TrafficLimit: 200_000_000, MessageLimit: 20_000_000, + MaxDepth: 24, + BatchSize: 1000, + MaxAttempts: 3, + MaxBatchRetries: 3, + FailedBatchDelay: 10 * time.Second, } } // P2PHashSync is handles the synchronization of a local OrderedSet against other peers. type P2PHashSync struct { - logger *zap.Logger - cfg Config - os rangesync.OrderedSet - syncBase multipeer.SyncBase - reconciler *multipeer.MultiPeerReconciler - cancel context.CancelFunc - eg errgroup.Group - start sync.Once - running atomic.Bool + logger *zap.Logger + cfg Config + enableActiveSync bool + os rangesync.OrderedSet + syncBase multipeer.SyncBase + reconciler *multipeer.MultiPeerReconciler + cancel context.CancelFunc + eg errgroup.Group + startOnce sync.Once + running atomic.Bool + kickCh chan struct{} } // NewP2PHashSync creates a new P2PHashSync. @@ -56,23 +67,24 @@ func NewP2PHashSync( d *rangesync.Dispatcher, name string, os rangesync.OrderedSet, - keyLen, maxDepth int, + keyLen int, peers *peers.Peers, handler multipeer.SyncKeyHandler, cfg Config, - requester rangesync.Requester, + enableActiveSync bool, ) *P2PHashSync { s := &P2PHashSync{ - logger: logger, - os: os, - cfg: cfg, + logger: logger, + os: os, + cfg: cfg, + kickCh: make(chan struct{}, 1), + enableActiveSync: enableActiveSync, } - // var ps multipeer.PairwiseSyncer - ps := rangesync.NewPairwiseSetSyncer(logger, requester, name, cfg.RangeSetReconcilerConfig) + ps := rangesync.NewPairwiseSetSyncer(logger, d, name, cfg.RangeSetReconcilerConfig) s.syncBase = multipeer.NewSetSyncBase(logger, ps, s.os, handler) s.reconciler = multipeer.NewMultiPeerReconciler( logger, cfg.MultiPeerReconcilerConfig, - s.syncBase, peers, keyLen, maxDepth) + s.syncBase, peers, keyLen, cfg.MaxDepth) d.Register(name, s.serve) return s } @@ -92,6 +104,9 @@ func (s *P2PHashSync) Set() rangesync.OrderedSet { // Load loads the OrderedSet from the underlying storage. func (s *P2PHashSync) Load() error { + if s.os.Loaded() { + return nil + } s.logger.Info("loading the set") start := time.Now() // We pre-load the set to avoid waiting for it to load during a @@ -106,30 +121,51 @@ func (s *P2PHashSync) Load() error { s.logger.Info("done loading the set", zap.Duration("elapsed", time.Since(start)), zap.Int("count", info.Count), - zap.Stringer("fingerprint", info.Fingerprint)) + zap.Stringer("fingerprint", info.Fingerprint), + zap.Int("maxDepth", s.cfg.MaxDepth)) return nil } -// Start starts the multi-peer reconciler. -func (s *P2PHashSync) Start() { - if !s.cfg.EnableActiveSync { - s.logger.Info("active sync is disabled") - return - } +func (s *P2PHashSync) start() (isWaiting bool) { s.running.Store(true) - s.start.Do(func() { - s.eg.Go(func() error { - defer s.running.Store(false) - var ctx context.Context - ctx, s.cancel = context.WithCancel(context.Background()) - return s.reconciler.Run(ctx) - }) + isWaiting = true + s.startOnce.Do(func() { + isWaiting = false + if s.enableActiveSync { + s.eg.Go(func() error { + defer s.running.Store(false) + var ctx context.Context + ctx, s.cancel = context.WithCancel(context.Background()) + return s.reconciler.Run(ctx, s.kickCh) + }) + return + } else { + s.logger.Info("active syncv2 is disabled") + return + } }) + return isWaiting +} + +// Start starts the multi-peer reconciler if it is not already running. +func (s *P2PHashSync) Start() { + s.start() +} + +// StartAndSync starts the multi-peer reconciler if it is not already running, and waits +// until the local OrderedSet is in sync with the peers. +func (s *P2PHashSync) StartAndSync(ctx context.Context) error { + if s.start() { + // If the multipeer reconciler is waiting for sync, we kick it to start + // the sync so as not to wait for the next scheduled sync interval. + s.kickCh <- struct{}{} + } + return s.WaitForSync(ctx) } // Stop stops the multi-peer reconciler. func (s *P2PHashSync) Stop() { - if !s.cfg.EnableActiveSync || !s.running.Load() { + if !s.enableActiveSync || !s.running.Load() { return } if s.cancel != nil { diff --git a/sync2/p2p_test.go b/sync2/p2p_test.go index 7d9eb222eb..dd2d1ba7d2 100644 --- a/sync2/p2p_test.go +++ b/sync2/p2p_test.go @@ -96,8 +96,8 @@ func TestP2P(t *testing.T) { } } cfg := sync2.DefaultConfig() - cfg.EnableActiveSync = true cfg.SyncInterval = 100 * time.Millisecond + cfg.MaxDepth = maxDepth host := mesh.Hosts()[n] handlers[n] = &fakeHandler{ mtx: &mtx, @@ -113,7 +113,7 @@ func TestP2P(t *testing.T) { eg.Go(func() error { return srv.Run(ctx) }) hs[n] = sync2.NewP2PHashSync( logger.Named(fmt.Sprintf("node%d", n)), - d, "test", &os, keyLen, maxDepth, ps, handlers[n], cfg, srv) + d, "test", &os, keyLen, ps, handlers[n], cfg, true) require.NoError(t, hs[n].Load()) is := hs[n].Set().(*rangesync.DumbSet) is.SetAllowMultiReceive(true)