Skip to content
This repository has been archived by the owner on Mar 8, 2024. It is now read-only.

Commit

Permalink
Rearchitect the PeerManager and DBs to support peer bucketing by topic
Browse files Browse the repository at this point in the history
  • Loading branch information
Djadih committed Mar 8, 2024
1 parent 9cb62a8 commit 3506c49
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 99 deletions.
37 changes: 23 additions & 14 deletions p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ func (p *P2PNode) Stop() error {
func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, datatype interface{}, resultChan chan interface{}) {
go func() {
defer close(resultChan)
peers := p.peerManager.GetBestPeersWithFallback()
peers := p.peerManager.GetBestPeersWithFallback(location)
log.Global.WithFields(log.Fields{
"peers": peers,
"location": location,
}).Debug("Requesting data from peers")

var requestWg sync.WaitGroup
for _, peerID := range peers {
Expand Down Expand Up @@ -147,27 +151,30 @@ func (p *P2PNode) queryDHT(location common.Location, data interface{}, datatype
}
}

func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data interface{}, datatype interface{}, resultChan chan interface{}) {
func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data interface{}, dataType interface{}, resultChan chan interface{}) {
// Ask peer and wait for response
if recvd, err := p.requestFromPeer(peerID, location, data, datatype); err == nil {
if recvd, err := p.requestFromPeer(peerID, location, data, dataType); err == nil {
log.Global.WithFields(log.Fields{
"data": data,
"peerId": peerID,
"data": data,
"dataType": dataType,
"peerId": peerID,
"location": location.Name(),
}).Warn("Received data from peer")
// send the block to the result channel
resultChan <- recvd

// Mark this peer as behaving well
p.peerManager.MarkResponsivePeer(peerID)
p.peerManager.MarkResponsivePeer(peerID, location)
} else {
log.Global.WithFields(log.Fields{
"peerId": peerID,
"location": location.Name(),
"data": data,
"datatype": datatype,
"dataType": dataType,
"err": err,
}).Error("Error requesting the data from peer")
// Mark this peer as not responding
p.peerManager.MarkUnresponsivePeer(peerID)
p.peerManager.MarkUnresponsivePeer(peerID, location)
}
}

Expand All @@ -183,20 +190,22 @@ func (p *P2PNode) Request(location common.Location, requestData interface{}, res
return resultChan
}

func (p *P2PNode) MarkLivelyPeer(peer p2p.PeerID) {
func (p *P2PNode) MarkLivelyPeer(peer p2p.PeerID, location common.Location) {
log.Global.WithFields(log.Fields{
"peer": peer,
"peer": peer,
"location": location,
}).Debug("Recording well-behaving peer")

p.peerManager.MarkLivelyPeer(peer)
p.peerManager.MarkLivelyPeer(peer, location)
}

func (p *P2PNode) MarkLatentPeer(peer p2p.PeerID) {
func (p *P2PNode) MarkLatentPeer(peer p2p.PeerID, location common.Location) {
log.Global.WithFields(log.Fields{
"peer": peer,
"peer": peer,
"location": location,
}).Debug("Recording misbehaving peer")

p.peerManager.MarkLatentPeer(peer)
p.peerManager.MarkLatentPeer(peer, location)
}

func (p *P2PNode) ProtectPeer(peer p2p.PeerID) {
Expand Down
Loading

0 comments on commit 3506c49

Please sign in to comment.