Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(session): do not record erroneous session want sends #452

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"fmt"
"sync"

logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -143,13 +144,15 @@

// SendWants sends the given want-blocks and want-haves to the given peer.
// It filters out wants that have previously been sent to the peer.
func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
func (pm *PeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) error {
pm.pqLk.Lock()
defer pm.pqLk.Unlock()

if _, ok := pm.peerQueues[p]; ok {
pm.pwm.sendWants(p, wantBlocks, wantHaves)
if _, ok := pm.peerQueues[p]; !ok {
return fmt.Errorf("No peer queue for %s", p)

Check failure on line 152 in bitswap/client/internal/peermanager/peermanager.go

View workflow job for this annotation

GitHub Actions / go-check / All

error strings should not be capitalized (ST1005)

Check warning on line 152 in bitswap/client/internal/peermanager/peermanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/internal/peermanager/peermanager.go#L152

Added line #L152 was not covered by tests
}
pm.pwm.sendWants(p, wantBlocks, wantHaves)
return nil
}

// SendCancels sends cancels for the given keys to all peers who had previously
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type PeerManager interface {
// interested in a peer's connection state
UnregisterSession(uint64)
// SendWants tells the PeerManager to send wants to the given peer
SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid)
SendWants(ctx context.Context, peerId peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) error
// BroadcastWantHaves sends want-haves to all connected peers (used for
// session discovery)
BroadcastWantHaves(context.Context, []cid.Cid)
Expand Down
8 changes: 5 additions & 3 deletions bitswap/client/internal/session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ func newFakePeerManager() *fakePeerManager {
}
}

func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) {}
func (pm *fakePeerManager) UnregisterSession(uint64) {}
func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {}
func (pm *fakePeerManager) RegisterSession(peer.ID, bspm.Session) {}
func (pm *fakePeerManager) UnregisterSession(uint64) {}
func (pm *fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) error {
return nil
}
func (pm *fakePeerManager) BroadcastWantHaves(ctx context.Context, cids []cid.Cid) {
select {
case pm.wantReqs <- wantReq{cids}:
Expand Down
27 changes: 19 additions & 8 deletions bitswap/client/internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ func (sws *sessionWantSender) processExhaustedWants(exhausted []cid.Cid) {
type wantSets struct {
wantBlocks *cid.Set
wantHaves *cid.Set
result error
}

type allWants map[peer.ID]*wantSets
Expand Down Expand Up @@ -552,9 +553,6 @@ func (sws *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) {
continue
}

// Record that we are sending a want-block for this want to the peer
sws.setWantSentTo(c, wi.bestPeer)

// Send a want-block to the chosen peer
toSend.forPeer(wi.bestPeer).wantBlocks.Add(c)

Expand All @@ -568,6 +566,16 @@ func (sws *sessionWantSender) sendNextWants(newlyAvailable []peer.ID) {

// Send any wants we've collected
sws.sendWants(toSend)

for c, wi := range sws.wants {
if wi.bestPeer != "" && wi.sentTo == "" {
// check if a want block was successfully sent to the best peer
if toSend.forPeer(wi.bestPeer).result == nil {
// Record that we are sending a want-block for this want to the peer
sws.setWantSentTo(c, wi.bestPeer)
}
}
}
}

// sendWants sends want-have and want-blocks to the appropriate peers
Expand All @@ -585,13 +593,16 @@ func (sws *sessionWantSender) sendWants(sends allWants) {
// precedence over want-haves.
wblks := snd.wantBlocks.Keys()
whaves := snd.wantHaves.Keys()
sws.pm.SendWants(sws.ctx, p, wblks, whaves)
snd.result = sws.pm.SendWants(sws.ctx, p, wblks, whaves)

// Inform the session that we've sent the wants
sws.onSend(p, wblks, whaves)
// only update state if the wants really sent
if snd.result == nil {
// Inform the session that we've sent the wants
sws.onSend(p, wblks, whaves)

// Record which peers we send want-block to
sws.swbt.addSentWantBlocksTo(p, wblks)
// Record which peers we send want-block to
sws.swbt.addSentWantBlocksTo(p, wblks)
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion bitswap/client/internal/session/sessionwantsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (*mockPeerManager) UnregisterSession(uint64) {}
func (*mockPeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*mockPeerManager) SendCancels(context.Context, []cid.Cid) {}

func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) error {
pm.lk.Lock()
defer pm.lk.Unlock()

Expand All @@ -92,6 +92,7 @@ func (pm *mockPeerManager) SendWants(ctx context.Context, p peer.ID, wantBlocks
pm.peerSends[p] = sw
}
sw.add(wantBlocks, wantHaves)
return nil
}

func (pm *mockPeerManager) waitNextWants() map[peer.ID]*sentWants {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ type fakePeerManager struct {
cancels []cid.Cid
}

func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) {}
func (*fakePeerManager) UnregisterSession(uint64) {}
func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) {}
func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (*fakePeerManager) RegisterSession(peer.ID, bspm.Session) {}
func (*fakePeerManager) UnregisterSession(uint64) {}
func (*fakePeerManager) SendWants(context.Context, peer.ID, []cid.Cid, []cid.Cid) error { return nil }
func (*fakePeerManager) BroadcastWantHaves(context.Context, []cid.Cid) {}
func (fpm *fakePeerManager) SendCancels(ctx context.Context, cancels []cid.Cid) {
fpm.lk.Lock()
defer fpm.lk.Unlock()
Expand Down
Loading