Skip to content
This repository has been archived by the owner on Mar 8, 2024. It is now read-only.

Commit

Permalink
Implement stream cache, fix the peer pruning process
Browse files Browse the repository at this point in the history
  • Loading branch information
Djadih committed Feb 27, 2024
1 parent b9f3b42 commit 9bc9a05
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 14 deletions.
5 changes: 2 additions & 3 deletions p2p/node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"

"github.com/dominant-strategies/go-quai/common"
)
Expand Down Expand Up @@ -230,8 +229,8 @@ func (p *P2PNode) GetBootPeers() []peer.AddrInfo {
}

// Opens a new stream to the given peer using the given protocol ID
func (p *P2PNode) NewStream(peerID peer.ID, protocolID protocol.ID) (network.Stream, error) {
return p.Host.NewStream(p.ctx, peerID, protocolID)
func (p *P2PNode) NewStream(peerID peer.ID) (network.Stream, error) {
return p.peerManager.GetStream(peerID)
}

// Connects to the given peer
Expand Down
80 changes: 73 additions & 7 deletions p2p/peerManager/peerManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@ import (
"strings"
"sync"

lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"

"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/p2p"
quaiprotocol "github.com/dominant-strategies/go-quai/p2p/protocol"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/query"
basicConnGater "github.com/libp2p/go-libp2p/p2p/net/conngater"
basicConnMgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"

"github.com/dominant-strategies/go-quai/p2p/peerManager/peerdb"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
basicConnGater "github.com/libp2p/go-libp2p/p2p/net/conngater"
basicConnMgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
)

const (
Expand All @@ -26,13 +31,20 @@ const (

// The number of peers to return when querying for peers
c_peerCount = 3
// The amount of redundancy for open streams
// c_peerCount * c_streamReplicationFactor = total number of open streams
c_streamReplicationFactor = 3

// Dir names for the peerDBs
c_bestDBName = "bestPeersDB"
c_responsiveDBName = "responsivePeersDB"
c_lastResortDBName = "lastResortPeersDB"
)

var (
errStreamNotFound = errors.New("stream not found")
)

// PeerManager is an interface that extends libp2p Connection Manager and Gater
type PeerManager interface {
connmgr.ConnManager
Expand All @@ -52,8 +64,10 @@ type PeerManager interface {
AddPeer(p2p.PeerID) error
// Removes a peer from all the quality buckets
RemovePeer(p2p.PeerID) error
// Returns an existing stream with that peer or opens a new one
GetStream(p peer.ID) (network.Stream, error)

// Returns c_recipientCount of the highest quality peers: lively & resposnive
// Returns c_recipientCount of the highest quality peers: lively & responsive
GetBestPeersWithFallback() []p2p.PeerID
// Returns c_recipientCount responsive, but less lively peers
GetResponsivePeersWithFallback() []p2p.PeerID
Expand Down Expand Up @@ -86,6 +100,7 @@ type BasicPeerManager struct {
*basicConnMgr.BasicConnMgr

p2pBackend quaiprotocol.QuaiP2PNode
streamCache *lru.Cache

selfID p2p.PeerID

Expand Down Expand Up @@ -122,22 +137,39 @@ func NewManager(ctx context.Context, low int, high int, datastore datastore.Data
return nil, err
}

lruCache, err := lru.NewWithEvict(
c_peerCount*c_streamReplicationFactor,
severStream,
)
if err != nil {
return nil, err
}

return &BasicPeerManager{
ctx: ctx,
streamCache: lruCache,
BasicConnMgr: mgr,
BasicConnectionGater: gater,
bestPeersDB: bestPeersDB,
responsivePeersDB: responsivePeersDB,
lastResortPeers: lastResortPeers,
ctx: ctx,
}, nil
}

func (pm *BasicPeerManager) AddPeer(peerID p2p.PeerID) error {
return pm.recategorizePeer(peerID)
}

// Removes peer from the bucket it is in. Does not return an error if the peer is not found
func (pm *BasicPeerManager) RemovePeer(peerID p2p.PeerID) error {
err := pm.removePeerFromDBs(peerID)
if err != nil {
return err
}
return pm.prunePeerConnection(peerID)
}

// Removes peer from the bucket it is in. Does not return an error if the peer is not found
func (pm *BasicPeerManager) removePeerFromDBs(peerID p2p.PeerID) error {
key := datastore.NewKey(peerID.String())

dbs := []*peerdb.PeerDB{pm.bestPeersDB, pm.responsivePeersDB, pm.lastResortPeers}
Expand All @@ -147,13 +179,47 @@ func (pm *BasicPeerManager) RemovePeer(peerID p2p.PeerID) error {
return db.Delete(pm.ctx, key)
}
}

return nil
}

func (pm *BasicPeerManager) prunePeerConnection(peerID p2p.PeerID) error {
stream, ok := pm.streamCache.Get(peerID)
if ok {
log.Global.WithField("peerID", peerID).Debug("Pruned connection with peer")
severStream(peerID, stream)
return nil
}
return errStreamNotFound
}

func severStream(key interface{}, value interface{}) {
stream := value.(network.Stream)
stream.Close()
}

func (pm *BasicPeerManager) SetP2PBackend(host quaiprotocol.QuaiP2PNode) {
pm.p2pBackend = host
}

func (pm *BasicPeerManager) GetStream(peerID p2p.PeerID) (network.Stream, error) {
stream, ok := pm.streamCache.Get(peerID)
var err error
if !ok {
// Create a new stream to the peer and register it in the cache
stream, err = pm.p2pBackend.GetHostBackend().NewStream(pm.ctx, peerID, quaiprotocol.ProtocolVersion)
if err != nil {
// Explicitly return nil here to avoid casting a nil later
return nil, err
}
pm.streamCache.Add(peerID, stream)
go quaiprotocol.QuaiProtocolHandler(stream.(network.Stream), pm.p2pBackend)
log.Global.Debug("Had to create new stream")
} else {
log.Global.Debug("Requested stream was found in cache")
}

return stream.(network.Stream), err
}

func (pm *BasicPeerManager) SetSelfID(selfID p2p.PeerID) {
pm.selfID = selfID
Expand Down Expand Up @@ -267,7 +333,7 @@ func (pm *BasicPeerManager) recategorizePeer(peer p2p.PeerID) error {
responsiveness := pm.calculatePeerResponsiveness(peer)

// remove peer from DB first
err := pm.RemovePeer(peer)
err := pm.removePeerFromDBs(peer)
if err != nil {
return err
}
Expand Down
9 changes: 5 additions & 4 deletions p2p/protocol/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@ package protocol
import (
"math/big"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/libp2p/go-libp2p/core/network"

"github.com/dominant-strategies/go-quai/common"
"github.com/dominant-strategies/go-quai/core/types"
"github.com/dominant-strategies/go-quai/p2p/requestManager"
"github.com/dominant-strategies/go-quai/trie"
)

// interface required to join the quai protocol network
type QuaiP2PNode interface {
GetBootPeers() []peer.AddrInfo
Connect(pi peer.AddrInfo) error
NewStream(peerID peer.ID) (network.Stream, error)
Network() network.Network
// Search for a block in the node's cache, or query the consensus backend if it's not found in cache.
// Returns nil if the block is not found.
GetBlock(hash common.Hash, location common.Location) *types.Block
Expand All @@ -27,4 +25,7 @@ type QuaiP2PNode interface {
GetRequestManager() requestManager.RequestManager
GetHostBackend() host.Host

Connect(peer.AddrInfo) error
NewStream(peer.ID) (network.Stream, error)
Network() network.Network
}

0 comments on commit 9bc9a05

Please sign in to comment.