Skip to content

Commit

Permalink
Query the DHT for peers if not enough found in DB
Browse files Browse the repository at this point in the history
  • Loading branch information
Djadih committed Apr 23, 2024
1 parent 92a3710 commit c07c1e7
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 54 deletions.
22 changes: 0 additions & 22 deletions p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ func (p *P2PNode) Stop() error {
// define a list of functions to stop the services the node is running
stopFuncs := []stopFunc{
p.Host.Close,
p.dht.Close,
p.peerManager.Stop,
}
// create a channel to collect errors
Expand Down Expand Up @@ -131,27 +130,6 @@ func (p *P2PNode) requestFromPeers(location common.Location, data interface{}, d
}()
}

func (p *P2PNode) queryDHT(location common.Location, data interface{}, datatype interface{}, resultChan chan interface{}) {
const (
maxDHTQueryRetries = 3 // Maximum number of retries for DHT queries
peersPerDHTQuery = 10 // Number of peers to query per DHT attempt
dhtQueryRetryInterval = 5 // Time to wait between DHT query retries
)
// create a Cid from the slice location
shardCid := locationToCid(location)
for retries := 0; retries < maxDHTQueryRetries; retries++ {
log.Global.Infof("Querying DHT for slice Cid %s (retry %d)", shardCid, retries)
// query the DHT for peers in the slice
// TODO: need to find providers of a topic, not a shard
for peer := range p.dht.FindProvidersAsync(p.ctx, shardCid, peersPerDHTQuery) {
go p.requestAndWait(peer.ID, location, data, datatype, resultChan)
}
// if the data is not found, wait for a bit and try again
log.Global.Infof("Block %s not found in slice %s. Retrying...", data, location)
time.Sleep(dhtQueryRetryInterval * time.Second)
}
}

func (p *P2PNode) requestAndWait(peerID peer.ID, location common.Location, data interface{}, dataType interface{}, resultChan chan interface{}) {
var recvd interface{}
var err error
Expand Down
8 changes: 3 additions & 5 deletions p2p/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ type P2PNode struct {
// List of peers to introduce us to the network
bootpeers []peer.AddrInfo

// TODO: Consolidate into network interface, and consensus interface
// DHT instance
dht *dual.DHT

// Gossipsub instance
pubsub *pubsubManager.PubsubManager

Expand Down Expand Up @@ -180,6 +176,9 @@ func NewNode(ctx context.Context) (*P2PNode, error) {
// Set peer manager's self ID
peerMgr.SetSelfID(nodeID)

// Set the DHT for the peer manager
peerMgr.SetDHT(dht)

// Create a gossipsub instance with helper functions
ps, err := pubsubManager.NewGossipSubManager(ctx, host)

Expand All @@ -191,7 +190,6 @@ func NewNode(ctx context.Context) (*P2PNode, error) {
ctx: ctx,
Host: host,
bootpeers: bootpeers,
dht: dht,
pubsub: ps,
peerManager: peerMgr,
requestManager: requestManager.NewManager(),
Expand Down
14 changes: 0 additions & 14 deletions p2p/node/p2p_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"math/big"
"time"

"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multihash"
"github.com/pkg/errors"

"github.com/dominant-strategies/go-quai/common"
Expand Down Expand Up @@ -126,15 +124,3 @@ func (p *P2PNode) GetRequestManager() requestManager.RequestManager {
func (p *P2PNode) GetHostBackend() host.Host {
return p.Host
}

// Creates a Cid from a location to be used as DHT key
func locationToCid(location common.Location) cid.Cid {
sliceBytes := []byte(location.Name())

// create a multihash from the slice ID
mhash, _ := multihash.Encode(sliceBytes, multihash.SHA2_256)

// create a Cid from the multihash
return cid.NewCidV1(cid.Raw, mhash)

}
10 changes: 3 additions & 7 deletions p2p/node/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,24 @@ import (

// Returns the number of peers in the routing table, as well as how many active
// connections we currently have.
func (p *P2PNode) connectionStats() (int, int, int) {
WANPeerNum := len(p.dht.WAN.RoutingTable().ListPeers())
LANPeerNum := len(p.dht.LAN.RoutingTable().ListPeers())
func (p *P2PNode) connectionStats() (int) {
peers := p.Host.Network().Peers()
numConnected := len(peers)

return WANPeerNum, LANPeerNum, numConnected
return numConnected
}

func (p *P2PNode) statsLoop() {
ticker := time.NewTicker(30 * time.Second)
for {
select {
case <-ticker.C:
WANPeerNum, LANPeerNum, peersConnected := p.connectionStats()
peersConnected := p.connectionStats()

log.Global.Debugf("Number of peers connected: %d", peersConnected)
log.Global.Debugf("Peers in WAN Routing table: %d, Peers in LAN Routing table: %d", WANPeerNum, LANPeerNum)
case <-p.ctx.Done():
log.Global.Warnf("Context cancelled. Stopping stats loop...")
return
}

}
}
55 changes: 49 additions & 6 deletions p2p/peerManager/peerManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import (
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/p2p"
quaiprotocol "github.com/dominant-strategies/go-quai/p2p/protocol"
"github.com/dominant-strategies/go-quai/p2p/pubsubManager"
"github.com/dominant-strategies/go-quai/p2p/streamManager"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"

"github.com/dominant-strategies/go-quai/p2p/peerManager/peerdb"
"github.com/libp2p/go-libp2p-kad-dht/dual"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -66,6 +68,9 @@ type PeerManager interface {
// Sets the ID for the node running the peer manager
SetSelfID(p2p.PeerID)

// Sets the DHT provided from the Host interface
SetDHT(*dual.DHT)

// Manages stream lifecycles
streamManager.StreamManager

Expand All @@ -76,6 +81,7 @@ type PeerManager interface {

// Returns c_peerCount peers starting at the requested quality level of peers
// If there are not enough peers at the requested quality, it will return lower quality peers
// If there still aren't enough peers, it will query the DHT for more
GetPeers(common.Location, PeerQuality) []p2p.PeerID

// Increases the peer's liveliness score
Expand Down Expand Up @@ -103,9 +109,16 @@ type BasicPeerManager struct {
*basicConnGater.BasicConnectionGater
*basicConnMgr.BasicConnMgr

// Handles opening and closing streams
streamManager streamManager.StreamManager
peerDBs map[string][]*peerdb.PeerDB

// Tracks peers in different quality buckets
peerDBs map[string][]*peerdb.PeerDB

// DHT instance
dht *dual.DHT

// This peer's ID to distinguish self-broadcasts
selfID p2p.PeerID

ctx context.Context
Expand Down Expand Up @@ -228,6 +241,10 @@ func NewManager(ctx context.Context, low int, high int, datastore datastore.Data
}, nil
}

func (pm *BasicPeerManager) SetDHT(dht *dual.DHT) {
pm.dht = dht
}

func (pm *BasicPeerManager) GetStream(peerID p2p.PeerID) (network.Stream, error) {
return pm.streamManager.GetStream(peerID)
}
Expand Down Expand Up @@ -296,16 +313,41 @@ func (pm *BasicPeerManager) getPeersHelper(peerDB *peerdb.PeerDB, numPeers int)
}

func (pm *BasicPeerManager) GetPeers(location common.Location, quality PeerQuality) []p2p.PeerID {
var peerList []p2p.PeerID
switch quality {
case Best:
return pm.getBestPeersWithFallback(location)
peerList = pm.getBestPeersWithFallback(location)
case Responsive:
return pm.getResponsivePeersWithFallback(location)
peerList = pm.getResponsivePeersWithFallback(location)
case LastResort:
return pm.getLastResortPeers(location)
default:
return nil
peerList = pm.getLastResortPeers(location)
}

if len(peerList) == C_peerCount {
// Found sufficient number of peers
return peerList
}

// Query the DHT for more peers
return pm.queryDHT(location, peerList, C_peerCount-len(peerList))
}

func (pm *BasicPeerManager) queryDHT(location common.Location, peerList []p2p.PeerID, peerCount int) []p2p.PeerID {
// create a Cid from the slice location
shardCid := pubsubManager.LocationToCid(location)

// Internal list of peers from the dht
dhtPeers := make([]p2p.PeerID, 0, peerCount)
log.Global.Infof("Querying DHT for slice Cid %s", shardCid)
// query the DHT for peers in the slice
// TODO: need to find providers of a topic, not a shard
for peer := range pm.dht.FindProvidersAsync(pm.ctx, shardCid, peerCount) {
if peer.ID != pm.selfID {
dhtPeers = append(dhtPeers, peer.ID)
}
}
log.Global.Warn("Found the following peers from the DHT: ", dhtPeers)
return append(peerList, dhtPeers...)
}

func (pm *BasicPeerManager) getBestPeersWithFallback(location common.Location) []p2p.PeerID {
Expand Down Expand Up @@ -464,6 +506,7 @@ func (pm *BasicPeerManager) Stop() error {

closeFuncs := []func() error{
pm.BasicConnMgr.Close,
pm.dht.Close,
}

wg.Add(len(closeFuncs))
Expand Down
13 changes: 13 additions & 0 deletions p2p/pubsubManager/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (

"github.com/dominant-strategies/go-quai/common"
"github.com/dominant-strategies/go-quai/core/types"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multihash"
)

const (
Expand Down Expand Up @@ -43,3 +45,14 @@ func (g *PubsubManager) PeersForTopic(location common.Location, data interface{}
}
return g.topics[topicName].ListPeers(), nil
}

// Creates a Cid from a location to be used as DHT key
func LocationToCid(location common.Location) cid.Cid {
sliceBytes := []byte(location.Name())

// create a multihash from the slice ID
mhash, _ := multihash.Encode(sliceBytes, multihash.SHA2_256)

// create a Cid from the multihash
return cid.NewCidV1(cid.Raw, mhash)
}

0 comments on commit c07c1e7

Please sign in to comment.