From 86a7a45e260f8404ba598b539c4c4e27fc31c46a Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 23 Aug 2023 15:25:01 -0700 Subject: [PATCH] feat(session): do not record erroneous session want sends --- .../internal/peermanager/peermanager.go | 9 ++++--- bitswap/client/internal/session/session.go | 2 +- .../client/internal/session/session_test.go | 8 +++--- .../internal/session/sessionwantsender.go | 27 +++++++++++++------ .../session/sessionwantsender_test.go | 3 ++- .../sessionmanager/sessionmanager_test.go | 8 +++--- 6 files changed, 37 insertions(+), 20 deletions(-) diff --git a/bitswap/client/internal/peermanager/peermanager.go b/bitswap/client/internal/peermanager/peermanager.go index f26b8fbec..020836b1d 100644 --- a/bitswap/client/internal/peermanager/peermanager.go +++ b/bitswap/client/internal/peermanager/peermanager.go @@ -2,6 +2,7 @@ package peermanager import ( "context" + "fmt" "sync" logging "github.com/ipfs/go-log/v2" @@ -143,13 +144,15 @@ func (pm *PeerManager) BroadcastWantHaves(ctx context.Context, wantHaves []cid.C // SendWants sends the given want-blocks and want-haves to the given peer. // It filters out wants that have previously been sent to the peer. -func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) { +func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) error { pm.pqLk.Lock() defer pm.pqLk.Unlock() - if _, ok := pm.peerQueues[p]; ok { - pm.pwm.sendWants(p, wantBlocks, wantHaves) + if _, ok := pm.peerQueues[p]; !ok { + return fmt.Errorf("No peer queue for %s", p) } + pm.pwm.sendWants(p, wantBlocks, wantHaves) + return nil } // SendCancels sends cancels for the given keys to all peers who had previously diff --git a/bitswap/client/internal/session/session.go b/bitswap/client/internal/session/session.go index 39266a5e6..58ed2d6d9 100644 --- a/bitswap/client/internal/session/session.go +++ b/bitswap/client/internal/session/session.go @@ -37,7 +37,7 @@ type PeerManager interface { // interested in a peer's connection state UnregisterSession(uint64) // SendWants tells the PeerManager to send wants to the given peer - SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) + SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) error // BroadcastWantHaves sends want-haves to all connected peers (used for // session discovery) BroadcastWantHaves(context.Context, []cid.Cid) diff --git a/bitswap/client/internal/session/session_test.go b/bitswap/client/internal/session/session_test.go index cf6de1e5a..bdfe14c1d 100644 --- a/bitswap/client/internal/session/session_test.go +++ b/bitswap/client/internal/session/session_test.go @@ -137,9 +137,11 @@ func newFakePeerManager() *fakePeerManager { } } -func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) {} -func (pm *fakePeerManager) UnregisterSession(uint64) {} -func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {} +func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) {} +func (pm *fakePeerManager) UnregisterSession(uint64) {} +func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) error { + return nil +} func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Cid) { select { case pm.wantReqs <- wantReq{cids}: diff --git a/bitswap/client/internal/session/sessionwantsender.go b/bitswap/client/internal/session/sessionwantsender.go index 390fdf29d..c06a86fc9 100644 --- a/bitswap/client/internal/session/sessionwantsender.go +++ b/bitswap/client/internal/session/sessionwantsender.go @@ -514,6 +514,7 @@ func (sws *sessionWantSender) processExhaustedWants(exhausted []cid.Cid) { type wantSets struct { wantBlocks *cid.Set wantHaves *cid.Set + result error } type allWants map[peer.ID]*wantSets @@ -552,9 +553,6 @@ func (sws *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) { continue } - // Record that we are sending a want-block for this want to the peer - sws.setWantSentTo(c, wi.bestPeer) - // Send a want-block to the chosen peer toSend.forPeer(wi.bestPeer).wantBlocks.Add(c) @@ -568,6 +566,16 @@ func (sws *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) { // Send any wants we've collected sws.sendWants(toSend) + + for c, wi := range sws.wants { + if wi.bestPeer != "" && wi.sentTo == "" { + // check if a want block was successfully sent to the best peer + if toSend.forPeer(wi.bestPeer).result == nil { + // Record that we are sending a want-block for this want to the peer + sws.setWantSentTo(c, wi.bestPeer) + } + } + } } // sendWants sends want-have and want-blocks to the appropriate peers @@ -585,13 +593,16 @@ func (sws *sessionWantSender) sendWants(sends allWants) { // precedence over want-haves. wblks := snd.wantBlocks.Keys() whaves := snd.wantHaves.Keys() - sws.pm.SendWants(sws.ctx, p, wblks, whaves) + snd.result = sws.pm.SendWants(sws.ctx, p, wblks, whaves) - // Inform the session that we've sent the wants - sws.onSend(p, wblks, whaves) + // only update state if the wants really sent + if snd.result == nil { + // Inform the session that we've sent the wants + sws.onSend(p, wblks, whaves) - // Record which peers we send want-block to - sws.swbt.addSentWantBlocksTo(p, wblks) + // Record which peers we send want-block to + sws.swbt.addSentWantBlocksTo(p, wblks) + } } } diff --git a/bitswap/client/internal/session/sessionwantsender_test.go b/bitswap/client/internal/session/sessionwantsender_test.go index 476b13991..1a929d333 100644 --- a/bitswap/client/internal/session/sessionwantsender_test.go +++ b/bitswap/client/internal/session/sessionwantsender_test.go @@ -82,7 +82,7 @@ func (*mockPeerManager) UnregisterSession(uint64) {} func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {} -func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) { +func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) error { pm.lk.Lock() defer pm.lk.Unlock() @@ -92,6 +92,7 @@ func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks pm.peerSends[p] = sw } sw.add(wantBlocks, wantHaves) + return nil } func (pm *mockPeerManager) waitNextWants() map[peer.ID]*sentWants { diff --git a/bitswap/client/internal/sessionmanager/sessionmanager_test.go b/bitswap/client/internal/sessionmanager/sessionmanager_test.go index 5ecabfdb3..8dfaefae3 100644 --- a/bitswap/client/internal/sessionmanager/sessionmanager_test.go +++ b/bitswap/client/internal/sessionmanager/sessionmanager_test.go @@ -67,10 +67,10 @@ type fakePeerManager struct { cancels []cid.Cid } -func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) {} -func (*fakePeerManager) UnregisterSession(uint64) {} -func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {} -func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} +func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) {} +func (*fakePeerManager) UnregisterSession(uint64) {} +func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) error { return nil } +func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {} func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) { fpm.lk.Lock() defer fpm.lk.Unlock()