From 07377f09b71ae5fcc98198d65c13f681ac2bb9ac Mon Sep 17 00:00:00 2001 From: Ivan Shvedunov Date: Thu, 21 Nov 2024 10:57:20 +0000 Subject: [PATCH] fetch: smarter peer selection (#6477) ## Motivation Most of the time, there are some nodes that already joined the P2P network but didn't complete their initialization yet, and thus attempts to e.g. fetch blobs to them end up in stream setup errors (protocol not supported). Additionally, during initial phases of `syncv2` testing, we're going to have a limited number of nodes supporting `sync/2` protocol, and when choosing peers for `syncv2`, we must only include these peers. Co-authored-by: Ivan Shvedunov --- fetch/fetch.go | 16 ++++++- fetch/fetch_test.go | 10 +++-- fetch/mesh_data_test.go | 9 +++- fetch/peers/peers.go | 34 +++++++++++++- fetch/peers/peers_test.go | 71 +++++++++++++++++++++++++++++- sync2/multipeer/multipeer.go | 7 ++- sync2/multipeer/multipeer_test.go | 3 +- sync2/multipeer/split_sync_test.go | 3 +- sync2/p2p_test.go | 6 ++- 9 files changed, 145 insertions(+), 14 deletions(-) diff --git a/fetch/fetch.go b/fetch/fetch.go index 7517dcc01a..de5afcebc5 100644 --- a/fetch/fetch.go +++ b/fetch/fetch.go @@ -12,6 +12,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/protocol" "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" @@ -297,7 +298,16 @@ func NewFetch( // there is one test that covers this part. if host != nil { connectedf := func(peer p2p.Peer) { - if f.peers.Add(peer) { + protocols := func() []protocol.ID { + ps, err := host.Peerstore().GetProtocols(peer) + if err != nil { + f.logger.Debug("failed to get protocols for peer", + zap.Stringer("id", peer), zap.Error(err)) + return nil + } + return ps + } + if f.peers.Add(peer, protocols) { f.logger.Debug("adding peer", zap.Stringer("id", peer)) } } @@ -703,7 +713,9 @@ func (f *Fetch) organizeRequests(requests []RequestMessage) map[p2p.Peer][]*batc rng := rand.New(rand.NewChaCha8(seed)) peer2requests := make(map[p2p.Peer][]RequestMessage) - best := f.peers.SelectBest(RedundantPeers) + // When selecting peers, provide protocol IDs so that peers that aren't yet fully + // initialized are not picked for the request, avoiding unnecessary errors. + best := f.peers.SelectBestWithProtocols(RedundantPeers, []protocol.ID{hashProtocol, activeSetProtocol}) if len(best) == 0 { f.logger.Warn("cannot send batch: no peers found") f.mu.Lock() diff --git a/fetch/fetch_test.go b/fetch/fetch_test.go index 50e81d9fe7..84cf24125b 100644 --- a/fetch/fetch_test.go +++ b/fetch/fetch_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -187,7 +188,9 @@ func TestFetch_RequestHashBatchFromPeers(t *testing.T) { f := createFetch(t) f.cfg.MaxRetriesForRequest = 0 peer := p2p.Peer("buddy") - f.peers.Add(peer) + f.peers.Add(peer, func() []protocol.ID { + return []protocol.ID{hashProtocol, activeSetProtocol} + }) hsh0 := types.RandomHash() res0 := ResponseMessage{ @@ -259,8 +262,9 @@ func TestFetch_Loop_BatchRequestMax(t *testing.T) { f.cfg.BatchTimeout = 1 f.cfg.BatchSize = 2 peer := p2p.Peer("buddy") - f.peers.Add(peer) - + f.peers.Add(peer, func() []protocol.ID { + return []protocol.ID{hashProtocol, activeSetProtocol} + }) h1 := types.RandomHash() h2 := types.RandomHash() h3 := types.RandomHash() diff --git a/fetch/mesh_data_test.go b/fetch/mesh_data_test.go index 713c96a7b7..918503a5c6 100644 --- a/fetch/mesh_data_test.go +++ b/fetch/mesh_data_test.go @@ -8,6 +8,7 @@ import ( "testing" p2phost "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/protocol" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -137,7 +138,9 @@ func TestFetch_getHashes(t *testing.T) { f.Start() tb.Cleanup(f.Stop) for _, peer := range peers { - f.peers.Add(peer) + f.peers.Add(peer, func() []protocol.ID { + return []protocol.ID{hashProtocol, activeSetProtocol} + }) } f.mh.EXPECT().ID().Return("self").AnyTimes() f.RegisterPeerHashes(peers[0], hashes[:2]) @@ -249,7 +252,9 @@ func TestFetch_getHashesStreaming(t *testing.T) { f.Start() tb.Cleanup(f.Stop) for _, peer := range peers { - f.peers.Add(peer) + f.peers.Add(peer, func() []protocol.ID { + return []protocol.ID{hashProtocol, activeSetProtocol} + }) } f.mh.EXPECT().ID().Return("self").AnyTimes() f.RegisterPeerHashes(peers[0], hashes[:2]) diff --git a/fetch/peers/peers.go b/fetch/peers/peers.go index 91d581c474..4d25f1d9e9 100644 --- a/fetch/peers/peers.go +++ b/fetch/peers/peers.go @@ -1,11 +1,13 @@ package peers import ( + "slices" "strings" "sync" "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" "go.uber.org/zap/zapcore" "github.com/spacemeshos/go-spacemesh/p2p" @@ -16,6 +18,7 @@ type data struct { success, failures int failRate float64 averageLatency float64 + protocols func() []protocol.ID } func (d *data) latency(global float64) float64 { @@ -61,14 +64,14 @@ func (p *Peers) Contains(id peer.ID) bool { return exist } -func (p *Peers) Add(id peer.ID) bool { +func (p *Peers) Add(id peer.ID, protocols func() []protocol.ID) bool { p.mu.Lock() defer p.mu.Unlock() _, exist := p.peers[id] if exist { return false } - p.peers[id] = &data{id: id} + p.peers[id] = &data{id: id, protocols: protocols} return true } @@ -151,12 +154,39 @@ func (p *Peers) SelectBestFrom(peers []peer.ID) peer.ID { func (p *Peers) SelectBest(n int) []peer.ID { p.mu.Lock() defer p.mu.Unlock() + return p.selectBest(n, nil) +} + +// SelectBestWithProtocols is similar to SelectBest but filters peers by supported protocols. +// If protocols is empty, it returns the best peers regardless of the protocol. +// If protocols is not empty, it returns the best peers that support at least one of the protocols. +func (p *Peers) SelectBestWithProtocols(n int, protocols []protocol.ID) []peer.ID { + p.mu.Lock() + defer p.mu.Unlock() + return p.selectBest(n, protocols) +} + +func (p *Peers) selectBest(n int, protocols []protocol.ID) []peer.ID { + slices.Sort(protocols) + protocols = slices.Compact(protocols) lth := min(len(p.peers), n) if lth == 0 { return nil } best := make([]*data, 0, lth) for _, peer := range p.peers { + if len(protocols) > 0 { + found := false + for _, proto := range peer.protocols() { + if slices.Contains(protocols, proto) { + found = true + break + } + } + if !found { + continue + } + } for i := range best { if peer.less(best[i], p.globalLatency) { best[i], peer = peer, best[i] diff --git a/fetch/peers/peers_test.go b/fetch/peers/peers_test.go index 611a79106f..db77721432 100644 --- a/fetch/peers/peers_test.go +++ b/fetch/peers/peers_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/stretchr/testify/require" ) @@ -22,6 +23,7 @@ type event struct { success int failure int latency time.Duration + protocols []protocol.ID } func withEvents(events []event) *Peers { @@ -30,7 +32,7 @@ func withEvents(events []event) *Peers { if ev.delete { tracker.Delete(ev.id) } else if ev.add { - tracker.Add(ev.id) + tracker.Add(ev.id, func() []protocol.ID { return ev.protocols }) } for i := 0; i < ev.failure; i++ { tracker.OnFailure(ev.id, 0, ev.latency) @@ -210,6 +212,73 @@ func TestSelect(t *testing.T) { } } +func TestSelectBestWithProtocols(t *testing.T) { + for _, tc := range []struct { + desc string + events []event + + n int + protocols []protocol.ID + expect []peer.ID + }{ + { + desc: "no protocols required and no peer protocols", + events: []event{ + {id: "a", success: 1, latency: 8, add: true}, + {id: "b", success: 1, latency: 9, add: true}, + {id: "c", success: 3, latency: 14, add: true}, + }, + n: 2, + expect: []peer.ID{"a", "b"}, + protocols: nil, + }, + { + desc: "no protocols required, peers have protocols", + events: []event{ + {id: "a", success: 1, latency: 8, add: true, protocols: []protocol.ID{"a", "b"}}, + {id: "b", success: 1, latency: 9, add: true, protocols: []protocol.ID{"b", "c"}}, + {id: "c", success: 3, latency: 14, add: true, protocols: []protocol.ID{"c", "d"}}, + }, + n: 2, + expect: []peer.ID{"a", "b"}, + protocols: nil, + }, + { + desc: "single protocol required, peers have protocols", + events: []event{ + {id: "a", success: 1, latency: 8, add: true, protocols: []protocol.ID{"a", "b"}}, + {id: "b", success: 1, latency: 9, add: true, protocols: []protocol.ID{"b", "c"}}, + {id: "c", success: 3, latency: 14, add: true, protocols: []protocol.ID{"c", "d"}}, + }, + n: 2, + expect: []peer.ID{"b", "c"}, + protocols: []protocol.ID{"c"}, + }, + { + desc: "multiple protocols required, peers have protocols", + events: []event{ + {id: "a", success: 1, latency: 8, add: true, protocols: []protocol.ID{"a", "b"}}, + {id: "b", success: 1, latency: 9, add: true, protocols: []protocol.ID{"b", "c"}}, + {id: "c", success: 3, latency: 14, add: true, protocols: []protocol.ID{"c", "d"}}, + {id: "d", success: 3, latency: 12, add: true, protocols: []protocol.ID{"a", "e"}}, + }, + n: 3, + expect: []peer.ID{"a", "b", "c"}, + protocols: []protocol.ID{"b", "c"}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + require.Equal( + t, + tc.expect, + withEvents(tc.events).SelectBestWithProtocols(tc.n, tc.protocols), + "select best %d", + tc.n, + ) + }) + } +} + func TestTotal(t *testing.T) { const total = 100 events := []event{} diff --git a/sync2/multipeer/multipeer.go b/sync2/multipeer/multipeer.go index 3b35a057ee..8f5fd33adc 100644 --- a/sync2/multipeer/multipeer.go +++ b/sync2/multipeer/multipeer.go @@ -7,6 +7,7 @@ import ( "time" "github.com/jonboulle/clockwork" + "github.com/libp2p/go-libp2p/core/protocol" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -15,6 +16,10 @@ import ( "github.com/spacemeshos/go-spacemesh/sync2/rangesync" ) +const ( + Protocol = "sync/2" +) + type syncability struct { // peers that were probed successfully syncable []p2p.Peer @@ -283,7 +288,7 @@ func (mpr *MultiPeerReconciler) fullSync(ctx context.Context, syncPeers []p2p.Pe func (mpr *MultiPeerReconciler) syncOnce(ctx context.Context, lastWasSplit bool) (full bool, err error) { var s syncability for { - syncPeers := mpr.peers.SelectBest(mpr.cfg.SyncPeerCount) + syncPeers := mpr.peers.SelectBestWithProtocols(mpr.cfg.SyncPeerCount, []protocol.ID{Protocol}) mpr.logger.Debug("selected best peers for sync", zap.Int("syncPeerCount", mpr.cfg.SyncPeerCount), zap.Int("totalPeers", mpr.peers.Total()), diff --git a/sync2/multipeer/multipeer_test.go b/sync2/multipeer/multipeer_test.go index ab0a516839..d74ff95807 100644 --- a/sync2/multipeer/multipeer_test.go +++ b/sync2/multipeer/multipeer_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/jonboulle/clockwork" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" @@ -101,7 +102,7 @@ func (mt *multiPeerSyncTester) addPeers(n int) []p2p.Peer { r := make([]p2p.Peer, n) for i := 0; i < n; i++ { p := p2p.Peer(fmt.Sprintf("peer%d", i+1)) - mt.peers.Add(p) + mt.peers.Add(p, func() []protocol.ID { return []protocol.ID{multipeer.Protocol} }) r[i] = p } return r diff --git a/sync2/multipeer/split_sync_test.go b/sync2/multipeer/split_sync_test.go index 7a8e584009..24ba0a9cbd 100644 --- a/sync2/multipeer/split_sync_test.go +++ b/sync2/multipeer/split_sync_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/jonboulle/clockwork" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/stretchr/testify/require" gomock "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" @@ -108,7 +109,7 @@ func newTestSplitSync(t testing.TB) *splitSyncTester { AnyTimes() } for _, p := range tst.syncPeers { - tst.peers.Add(p) + tst.peers.Add(p, func() []protocol.ID { return []protocol.ID{multipeer.Protocol} }) } tst.splitSync = multipeer.NewSplitSync( zaptest.NewLogger(t), diff --git a/sync2/p2p_test.go b/sync2/p2p_test.go index a36e4ebf27..d3f0c56be8 100644 --- a/sync2/p2p_test.go +++ b/sync2/p2p_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/libp2p/go-libp2p/core/protocol" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/require" "go.uber.org/zap/zaptest" @@ -16,6 +17,7 @@ import ( "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/p2p/server" "github.com/spacemeshos/go-spacemesh/sync2" + "github.com/spacemeshos/go-spacemesh/sync2/multipeer" "github.com/spacemeshos/go-spacemesh/sync2/rangesync" ) @@ -88,7 +90,9 @@ func TestP2P(t *testing.T) { ps := peers.New() for m := 0; m < numNodes; m++ { if m != n { - ps.Add(mesh.Hosts()[m].ID()) + ps.Add(mesh.Hosts()[m].ID(), func() []protocol.ID { + return []protocol.ID{multipeer.Protocol} + }) } } cfg := sync2.DefaultConfig()