Skip to content
This repository has been archived by the owner on Mar 8, 2024. It is now read-only.

Commit

Permalink
Confirm receipt of nil
Browse files Browse the repository at this point in the history
  • Loading branch information
Djadih committed Mar 1, 2024
1 parent f1a885c commit 80282ba
Showing 1 changed file with 18 additions and 18 deletions.
36 changes: 18 additions & 18 deletions p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,18 @@ func (p *P2PNode) Stop() error {
}

func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, datatype interface{}, resultChan chan interface{}) {
go func() {
defer close(resultChan)
peers := p.peerManager.GetBestPeersWithFallback()

var requestWg sync.WaitGroup
for _, peerID := range peers {
requestWg.Add(1)
go func(peerID peer.ID) {
defer requestWg.Done()
p.requestAndWait(peerID, location, data, datatype, resultChan)
}(peerID)
}
requestWg.Wait()
}()
defer close(resultChan)
peers := p.peerManager.GetBestPeersWithFallback()

var requestWg sync.WaitGroup
for _, peerID := range peers {
requestWg.Add(1)
go func(peerID peer.ID) {
defer requestWg.Done()
p.requestAndWait(peerID, location, data, datatype, resultChan)
}(peerID)
}
requestWg.Wait()
}

func (p *P2PNode) queryDHT(location common.Location, data interface{}, datatype interface{}, resultChan chan interface{}) {
Expand All @@ -148,13 +146,13 @@ func (p *P2PNode) queryDHT(location common.Location, data interface{}, datatype

func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data interface{}, datatype interface{}, resultChan chan interface{}) {
// Ask peer and wait for response
if recvd, err := p.requestFromPeer(peerID, location, data, datatype); err == nil {
recvd, err := p.requestFromPeer(peerID, location, data, datatype)

if err == nil {
log.Global.WithFields(log.Fields{
"data": data,
"peerId": peerID,
}).Info("Received data from peer")
// send the block to the result channel
resultChan <- recvd

// Mark this peer as behaving well
p.peerManager.MarkResponsivePeer(peerID)
Expand All @@ -169,13 +167,15 @@ func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data
// Mark this peer as not responding
p.peerManager.MarkUnresponsivePeer(peerID)
}
// send the block to the result channel
resultChan <- recvd
}

// Request a data from the network for the specified slice
func (p *P2PNode) Request(location common.Location, requestData interface{}, responseDataType interface{}) chan interface{} {
resultChan := make(chan interface{}, 1)

p.requestFromPeers(location, requestData, responseDataType, resultChan)
go p.requestFromPeers(location, requestData, responseDataType, resultChan)
// TODO: optimize with waitgroups or a doneChan to only query if no peers responded
// Right now this creates too many streams, so don't call this until we have a better solution
// p.queryDHT(location, requestData, responseDataType, resultChan)
Expand Down

0 comments on commit 80282ba

Please sign in to comment.