Skip to content

Commit

Permalink
sync2: multipeer: fix edge cases (#6447)
Browse files Browse the repository at this point in the history
-------

## Motivation

Split sync could become blocked when there were slow peers. Their
subranges are assigned to other peers, and there were bugs causing
indefinite blocking and panics in these cases. Moreover, after other
peers managed to sync the slow peers' subranges ahead of them, we need
to interrupt syncing against the slow peers as it's no longer needed.

In multipeer sync, when every peer has failed to sync, e.g. due to
temporary connection interruption, we don't need to wait for the full
sync interval, using shorter wait time between retries.
  • Loading branch information
ivan4th committed Dec 5, 2024
1 parent 9714654 commit 91afd85
Show file tree
Hide file tree
Showing 7 changed files with 285 additions and 91 deletions.
3 changes: 2 additions & 1 deletion sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ func OpenInMemory(opts ...Opt) (*sqliteDatabase, error) {
opts = append(opts, withForceFresh())
// Unique uri is needed to avoid sharing the same in-memory database,
// while allowing multiple connections to the same database.
uri := fmt.Sprintf("file:mem-%d?mode=memory&cache=shared", rand.Uint64())
uri := fmt.Sprintf("file:mem-%d-%d?mode=memory&cache=shared",
rand.Uint64(), rand.Uint64())
return Open(uri, opts...)
}

Expand Down
30 changes: 26 additions & 4 deletions sync2/multipeer/multipeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"fmt"
"math"
"math/rand/v2"
"sync/atomic"
"time"

"github.com/jonboulle/clockwork"
Expand Down Expand Up @@ -69,6 +71,9 @@ type MultiPeerReconcilerConfig struct {
MinCompleteFraction float64 `mapstructure:"min-complete-fraction"`
// Interval between syncs.
SyncInterval time.Duration `mapstructure:"sync-interval"`
// Interval spread factor for split sync.
// The actual interval will be SyncInterval * (1 + (random[0..2]*SplitSyncIntervalSpread-1)).
SyncIntervalSpread float64 `mapstructure:"sync-interval-spread"`
// Interval between retries after a failed sync.
RetryInterval time.Duration `mapstructure:"retry-interval"`
// Interval between rechecking for peers after no synchronization peers were
Expand Down Expand Up @@ -96,6 +101,7 @@ func DefaultConfig() MultiPeerReconcilerConfig {
MaxFullDiff: 10000,
MaxSyncDiff: 100,
SyncInterval: 5 * time.Minute,
SyncIntervalSpread: 0.5,
RetryInterval: 1 * time.Minute,
NoPeersRecheckInterval: 30 * time.Second,
SplitSyncGracePeriod: time.Minute,
Expand Down Expand Up @@ -264,20 +270,27 @@ func (mpr *MultiPeerReconciler) needSplitSync(s syncability) bool {
}

func (mpr *MultiPeerReconciler) fullSync(ctx context.Context, syncPeers []p2p.Peer) error {
if len(syncPeers) == 0 {
return errors.New("no peers to sync against")
}
var eg errgroup.Group
var someSucceeded atomic.Bool
for _, p := range syncPeers {
eg.Go(func() error {
if err := mpr.syncBase.WithPeerSyncer(ctx, p, func(ps PeerSyncer) error {
err := ps.Sync(ctx, nil, nil)
switch {
case err == nil:
someSucceeded.Store(true)
mpr.sl.NoteSync()
case errors.Is(err, context.Canceled):
return err
default:
// failing to sync against a particular peer is not considered
// a fatal sync failure, so we just log the error
mpr.logger.Error("error syncing peer", zap.Stringer("peer", p), zap.Error(err))
mpr.logger.Error("error syncing peer",
zap.Stringer("peer", p),
zap.Error(err))
}
return nil
}); err != nil {
Expand All @@ -286,7 +299,13 @@ func (mpr *MultiPeerReconciler) fullSync(ctx context.Context, syncPeers []p2p.Pe
return nil
})
}
return eg.Wait()
if err := eg.Wait(); err != nil {
return err
}
if !someSucceeded.Load() {
return errors.New("all syncs failed")
}
return nil
}

func (mpr *MultiPeerReconciler) syncOnce(ctx context.Context, lastWasSplit bool) (full bool, err error) {
Expand Down Expand Up @@ -346,7 +365,7 @@ func (mpr *MultiPeerReconciler) syncOnce(ctx context.Context, lastWasSplit bool)
}

// Run runs the MultiPeerReconciler.
func (mpr *MultiPeerReconciler) Run(ctx context.Context) error {
func (mpr *MultiPeerReconciler) Run(ctx context.Context, kickCh chan struct{}) error {
// The point of using split sync, which syncs different key ranges against
// different peers, vs full sync which syncs the full key range against different
// peers, is:
Expand Down Expand Up @@ -384,7 +403,9 @@ func (mpr *MultiPeerReconciler) Run(ctx context.Context) error {
lastWasSplit := false
LOOP:
for {
interval := mpr.cfg.SyncInterval
interval := time.Duration(
float64(mpr.cfg.SyncInterval) *
(1 + mpr.cfg.SyncIntervalSpread*(rand.Float64()*2-1)))
full, err = mpr.syncOnce(ctx, lastWasSplit)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand All @@ -407,6 +428,7 @@ LOOP:
err = ctx.Err()
break LOOP
case <-mpr.clock.After(interval):
case <-kickCh:
}
}
// The loop is only exited upon context cancellation.
Expand Down
64 changes: 61 additions & 3 deletions sync2/multipeer/multipeer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type multiPeerSyncTester struct {
reconciler *multipeer.MultiPeerReconciler
cancel context.CancelFunc
eg errgroup.Group
kickCh chan struct{}
// EXPECT() calls should not be done concurrently
// https://github.com/golang/mock/issues/533#issuecomment-821537840
mtx sync.Mutex
Expand All @@ -81,10 +82,13 @@ func newMultiPeerSyncTester(t *testing.T, addPeers int) *multiPeerSyncTester {
syncRunner: NewMocksyncRunner(ctrl),
peers: peers.New(),
clock: clockwork.NewFakeClock().(fakeClock),
kickCh: make(chan struct{}, 1),
}
cfg := multipeer.DefaultConfig()
cfg.SyncInterval = time.Minute
cfg.SyncInterval = 40 * time.Second
cfg.SyncIntervalSpread = 0.1
cfg.SyncPeerCount = numSyncPeers
cfg.RetryInterval = 5 * time.Second
cfg.MinSplitSyncPeers = 2
cfg.MinSplitSyncCount = 90
cfg.MaxFullDiff = 20
Expand All @@ -111,7 +115,7 @@ func (mt *multiPeerSyncTester) addPeers(n int) []p2p.Peer {
func (mt *multiPeerSyncTester) start() context.Context {
var ctx context.Context
ctx, mt.cancel = context.WithTimeout(context.Background(), 10*time.Second)
mt.eg.Go(func() error { return mt.reconciler.Run(ctx) })
mt.eg.Go(func() error { return mt.reconciler.Run(ctx, mt.kickCh) })
mt.Cleanup(func() {
mt.cancel()
if err := mt.eg.Wait(); err != nil {
Expand All @@ -121,6 +125,10 @@ func (mt *multiPeerSyncTester) start() context.Context {
return ctx
}

func (mt *multiPeerSyncTester) kick() {
mt.kickCh <- struct{}{}
}

func (mt *multiPeerSyncTester) expectProbe(times int, pr rangesync.ProbeResult) *peerList {
var pl peerList
mt.syncBase.EXPECT().Probe(gomock.Any(), gomock.Any()).DoAndReturn(
Expand Down Expand Up @@ -183,7 +191,7 @@ func TestMultiPeerSync(t *testing.T) {
mt := newMultiPeerSyncTester(t, 0)
ctx := mt.start()
mt.clock.BlockUntilContext(ctx, 1)
// Advance by sync interval. No peers yet
// Advance by sync interval (incl. spread). No peers yet
mt.clock.Advance(time.Minute)
mt.clock.BlockUntilContext(ctx, 1)
// It is safe to do EXPECT() calls while the MultiPeerReconciler is blocked
Expand Down Expand Up @@ -249,6 +257,34 @@ func TestMultiPeerSync(t *testing.T) {
mt.syncBase.EXPECT().Wait()
})

t.Run("sync after kick", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 10)
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()
require.False(t, mt.reconciler.Synced())
expect := func() {
pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{
FP: "foo",
Count: 100,
Sim: 0.99, // high enough for full sync
})
mt.expectFullSync(pl, numSyncPeers, 0)
mt.syncBase.EXPECT().Wait()
}
expect()
// first full sync happens immediately
ctx := mt.start()
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()
for i := 0; i < numSyncs; i++ {
expect()
mt.kick()
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()
}
require.True(t, mt.reconciler.Synced())
mt.syncBase.EXPECT().Wait()
})

t.Run("full sync, peers with low count ignored", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 0)
addedPeers := mt.addPeers(numSyncPeers)
Expand Down Expand Up @@ -349,6 +385,28 @@ func TestMultiPeerSync(t *testing.T) {
mt.syncBase.EXPECT().Wait()
})

t.Run("all peers failed during full sync", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 10)
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()

pl := mt.expectProbe(numSyncPeers, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99})
mt.expectFullSync(pl, numSyncPeers, numSyncPeers)
mt.syncBase.EXPECT().Wait().AnyTimes()

ctx := mt.start()
mt.clock.BlockUntilContext(ctx, 1)
mt.satisfy()

pl = mt.expectProbe(numSyncPeers, rangesync.ProbeResult{FP: "foo", Count: 100, Sim: 0.99})
mt.expectFullSync(pl, numSyncPeers, 0)
// Retry should happen after mere 5 seconds as no peers have succeeded, no
// need to wait full sync interval.
mt.clock.Advance(5 * time.Second)
mt.satisfy()

require.True(t, mt.reconciler.Synced())
})

t.Run("failed synced key handling during full sync", func(t *testing.T) {
mt := newMultiPeerSyncTester(t, 10)
mt.syncBase.EXPECT().Count().Return(100, nil).AnyTimes()
Expand Down
30 changes: 27 additions & 3 deletions sync2/multipeer/split_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"golang.org/x/sync/errgroup"

"github.com/spacemeshos/go-spacemesh/fetch/peers"
"github.com/spacemeshos/go-spacemesh/log"
"github.com/spacemeshos/go-spacemesh/p2p"
)

Expand Down Expand Up @@ -150,6 +151,8 @@ func (s *splitSync) handleSyncResult(r syncResult) error {
s.syncPeers = append(s.syncPeers, r.ps.Peer())
s.numRemaining--
s.logger.Debug("peer synced successfully",
log.ZShortStringer("x", sr.X),
log.ZShortStringer("y", sr.Y),
zap.Stringer("peer", r.ps.Peer()),
zap.Int("numPeers", s.numPeers),
zap.Int("numRemaining", s.numRemaining),
Expand Down Expand Up @@ -199,8 +202,18 @@ func (s *splitSync) Sync(ctx context.Context) error {
}
select {
case sr = <-s.slowRangeCh:
// push this syncRange to the back of the queue
s.sq.Update(sr, s.clock.Now())
// Push this syncRange to the back of the queue.
// There's some chance that the peer managed to complete
// the sync while the range was still sitting in the
// channel, so we double-check if it's done.
if !sr.Done {
s.logger.Debug("slow peer, reassigning the range",
log.ZShortStringer("x", sr.X), log.ZShortStringer("y", sr.Y))
s.sq.Update(sr, s.clock.Now())
} else {
s.logger.Debug("slow peer, NOT reassigning the range: DONE",
log.ZShortStringer("x", sr.X), log.ZShortStringer("y", sr.Y))
}
case <-syncCtx.Done():
return syncCtx.Err()
case r := <-s.resCh:
Expand All @@ -210,5 +223,16 @@ func (s *splitSync) Sync(ctx context.Context) error {
}
}
}
return s.eg.Wait()
// Stop late peers that didn't manage to sync their ranges in time.
// The ranges were already reassigned to other peers and successfully
// synced by this point.
cancel()
err := s.eg.Wait()
if s.numRemaining == 0 {
// If all the ranges are synced, the split sync is considered successful
// even if some peers failed to sync their ranges, so that these ranges
// got synced by other peers.
return nil
}
return err
}
Loading

0 comments on commit 91afd85

Please sign in to comment.