diff --git a/p2p/node/api.go b/p2p/node/api.go index 004f28e44c..2fa8c0591c 100644 --- a/p2p/node/api.go +++ b/p2p/node/api.go @@ -109,20 +109,18 @@ func (p *P2PNode) Stop() error { } func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, datatype interface{}, resultChan chan interface{}) { - go func() { - defer close(resultChan) - peers := p.peerManager.GetBestPeersWithFallback() - - var requestWg sync.WaitGroup - for _, peerID := range peers { - requestWg.Add(1) - go func(peerID peer.ID) { - defer requestWg.Done() - p.requestAndWait(peerID, location, data, datatype, resultChan) - }(peerID) - } - requestWg.Wait() - }() + defer close(resultChan) + peers := p.peerManager.GetBestPeersWithFallback() + + var requestWg sync.WaitGroup + for _, peerID := range peers { + requestWg.Add(1) + go func(peerID peer.ID) { + defer requestWg.Done() + p.requestAndWait(peerID, location, data, datatype, resultChan) + }(peerID) + } + requestWg.Wait() } func (p *P2PNode) queryDHT(location common.Location, data interface{}, datatype interface{}, resultChan chan interface{}) { @@ -148,13 +146,13 @@ func (p *P2PNode) queryDHT(location common.Location, data interface{}, datatype func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data interface{}, datatype interface{}, resultChan chan interface{}) { // Ask peer and wait for response - if recvd, err := p.requestFromPeer(peerID, location, data, datatype); err == nil { + recvd, err := p.requestFromPeer(peerID, location, data, datatype) + + if err == nil { log.Global.WithFields(log.Fields{ "data": data, "peerId": peerID, }).Info("Received data from peer") - // send the block to the result channel - resultChan <- recvd // Mark this peer as behaving well p.peerManager.MarkResponsivePeer(peerID) @@ -169,13 +167,15 @@ func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data // Mark this peer as not responding p.peerManager.MarkUnresponsivePeer(peerID) } + // send the block to the result channel + resultChan <- recvd } // Request a data from the network for the specified slice func (p *P2PNode) Request(location common.Location, requestData interface{}, responseDataType interface{}) chan interface{} { resultChan := make(chan interface{}, 1) - p.requestFromPeers(location, requestData, responseDataType, resultChan) + go p.requestFromPeers(location, requestData, responseDataType, resultChan) // TODO: optimize with waitgroups or a doneChan to only query if no peers responded // Right now this creates too many streams, so don't call this until we have a better solution // p.queryDHT(location, requestData, responseDataType, resultChan)