Skip to content

Commit

Permalink
feat: add peer connections subcommand
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Jul 13, 2023
1 parent 64abbc0 commit b50802e
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 1 deletion.
80 changes: 80 additions & 0 deletions app/client/cli/peer/connections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package peer

import (
"fmt"

"github.com/spf13/cobra"
"google.golang.org/protobuf/types/known/anypb"

"github.com/pokt-network/pocket/app/client/cli/helpers"
"github.com/pokt-network/pocket/p2p/debug"
"github.com/pokt-network/pocket/shared/messaging"
)

var (
connectionsCmd = &cobra.Command{
Use: "connections",
Short: "Print open peer connections",
RunE: connectionsRunE,
}
)

func init() {
PeerCmd.AddCommand(connectionsCmd)
}

func connectionsRunE(cmd *cobra.Command, _ []string) error {
var routerType debug.RouterType

bus, err := helpers.GetBusFromCmd(cmd)
if err != nil {
return err
}

switch {
case stakedFlag:
if unstakedFlag || allFlag {
return ErrRouterType
}
routerType = debug.StakedRouterType
case unstakedFlag:
if stakedFlag || allFlag {
return ErrRouterType
}
routerType = debug.UnstakedRouterType
// even if `allFlag` is false, we still want to print all connections
default:
if stakedFlag || unstakedFlag {
return ErrRouterType
}
routerType = debug.AllRouterTypes
}

debugMsg := &messaging.DebugMessage{
Action: messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS,
Type: messaging.DebugMessageRoutingType_DEBUG_MESSAGE_TYPE_BROADCAST,
Message: &anypb.Any{
Value: []byte(routerType),
},
}
debugMsgAny, err := anypb.New(debugMsg)
if err != nil {
return fmt.Errorf("creating anypb from debug message: %w", err)
}

if localFlag {
if err := debug.PrintPeerConnections(bus, routerType); err != nil {
return fmt.Errorf("printing peer list: %w", err)
}
return nil
}

// TECHDEBT(#810, #811): will need to wait for DHT bootstrapping to complete before
// p2p broadcast can be used with to reach unstaked actors.
// CONSIDERATION: add the peer commands to the interactive CLI as the P2P module
// instance could persist between commands. Other interactive CLI commands which
// rely on unstaked actor router broadcast are working as expected.

// TECHDEBT(#810, #811): use broadcast instead to reach all peers.
return sendToStakedPeers(cmd, debugMsgAny)
}
6 changes: 5 additions & 1 deletion p2p/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (

func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error {
switch msg.Action {
case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST:
case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST,
messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS:
if !m.cfg.EnablePeerDiscoveryDebugRpc {
return typesP2P.ErrPeerDiscoveryDebugRPCDisabled
}
Expand All @@ -23,6 +24,9 @@ func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error {
case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST:
routerType := debug.RouterType(msg.Message.Value)
return debug.PrintPeerList(m.GetBus(), routerType)
case messaging.DebugMessageAction_DEBUG_P2P_PEER_CONNECTIONS:
routerType := debug.RouterType(msg.Message.Value)
return debug.PrintPeerConnections(m.GetBus(), routerType)
default:
return fmt.Errorf("unsupported P2P debug message action: %s", msg.Action)
}
Expand Down
149 changes: 149 additions & 0 deletions p2p/debug/connections.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package debug

import (
"fmt"
"os"
"strconv"

libp2pNetwork "github.com/libp2p/go-libp2p/core/network"
libp2pPeer "github.com/libp2p/go-libp2p/core/peer"

"github.com/pokt-network/pocket/p2p/providers/peerstore_provider"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/p2p/utils"
"github.com/pokt-network/pocket/shared/modules"
)

var printConnectionsHeader = []string{"Peer ID", "Multiaddr", "Opened", "Direction", "NumStreams"}

func PrintPeerConnections(bus modules.Bus, routerType RouterType) error {
var (
connections []libp2pNetwork.Conn
routerPlurality = ""
)

if routerType == AllRouterTypes {
routerPlurality = "s"
}

connections, err := getFilteredConnections(bus, routerType)
if err != nil {
return fmt.Errorf("getting connecions: %w", err)
}

if err := LogSelfAddress(bus); err != nil {
return fmt.Errorf("printing self address: %w", err)
}

// NB: Intentionally printing with `fmt` instead of the logger to match
// `utils.PrintPeerListTable` which does not use the logger due to
// incompatibilities with the tabwriter.
// (This doesn't seem to work as expected; i.e. not printing at all in tilt.)
if _, err := fmt.Fprintf(
os.Stdout,
"%s router peerstore%s:\n",
routerType,
routerPlurality,
); err != nil {
return fmt.Errorf("printing to stdout: %w", err)
}

if err := PrintConnectionsTable(connections); err != nil {
return fmt.Errorf("printing peer list: %w", err)
}
return nil
}

func PrintConnectionsTable(conns []libp2pNetwork.Conn) error {
return utils.PrintTable(printConnectionsHeader, peerConnsRowConsumerFactory(conns))
}

func getFilteredConnections(
bus modules.Bus,
routerType RouterType,
) ([]libp2pNetwork.Conn, error) {
var (
pstore typesP2P.Peerstore
idsToInclude map[libp2pPeer.ID]struct{}
p2pModule = bus.GetP2PModule()
connections = p2pModule.GetConnections()
)

// TECHDEBT(#810, #811): use `bus.GetPeerstoreProvider()` after peerstore provider
// is retrievable as a proper submodule.
pstoreProviderModule, err := bus.GetModulesRegistry().
GetModule(peerstore_provider.PeerstoreProviderSubmoduleName)
if err != nil {
return nil, fmt.Errorf("getting peerstore provider: %w", err)
}
pstoreProvider, ok := pstoreProviderModule.(peerstore_provider.PeerstoreProvider)
if !ok {
return nil, fmt.Errorf("unknown peerstore provider type: %T", pstoreProviderModule)
}
//--

switch routerType {
case AllRouterTypes:
// return early; no need to filter
return connections, nil
case StakedRouterType:
pstore, err = pstoreProvider.GetStakedPeerstoreAtCurrentHeight()
if err != nil {
return nil, fmt.Errorf("getting staked peerstore: %w", err)
}
case UnstakedRouterType:
pstore, err = pstoreProvider.GetUnstakedPeerstore()
if err != nil {
return nil, fmt.Errorf("getting unstaked peerstore: %w", err)
}
}

idsToInclude, err = getPeerIDs(pstore.GetPeerList())
if err != nil {
return nil, fmt.Errorf("getting peer IDs: %w", err)
}

for _, conn := range connections {
if _, ok := idsToInclude[conn.RemotePeer()]; !ok {
// remote peer ID not in `idsToInclude` set; filter connection out
connections = append(connections[:], connections[1:]...)
}
}
return connections, nil
}

func peerConnsRowConsumerFactory(conns []libp2pNetwork.Conn) utils.RowConsumer {
return func(provideRow utils.RowProvider) error {
for _, conn := range conns {
err := provideRow(
conn.RemotePeer().String(),
conn.RemoteMultiaddr().String(),
conn.Stat().Opened.String(),
conn.Stat().Direction.String(),
strconv.Itoa(conn.Stat().NumStreams),
)
if err != nil {
return err
}
}
return nil
}
}

func getPeerIDs(peers []typesP2P.Peer) (ids map[libp2pPeer.ID]struct{}, err error) {
for _, peer := range peers {
addrInfo, err := utils.Libp2pAddrInfoFromPeer(peer)
if err != nil {
return nil, err
}

// ID already in set; continue
if _, ok := ids[addrInfo.ID]; !ok {
continue
}

// add ID to set
ids[addrInfo.ID] = struct{}{}
}
return ids, nil
}
1 change: 1 addition & 0 deletions shared/messaging/proto/debug_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ enum DebugMessageAction {
DEBUG_PERSISTENCE_CLEAR_STATE = 8;
DEBUG_PERSISTENCE_RESET_TO_GENESIS = 9;
DEBUG_P2P_PEER_LIST = 10;
DEBUG_P2P_PEER_CONNECTIONS = 11;
}

message DebugMessage {
Expand Down

0 comments on commit b50802e

Please sign in to comment.