Skip to content

Commit

Permalink
feat: add peer list subcommand
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite committed Jul 7, 2023
1 parent ff67b88 commit d1e811e
Show file tree
Hide file tree
Showing 6 changed files with 333 additions and 6 deletions.
95 changes: 95 additions & 0 deletions app/client/cli/peer/list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package peer

import (
"fmt"
"log"

"github.com/spf13/cobra"
"google.golang.org/protobuf/types/known/anypb"

"github.com/pokt-network/pocket/app/client/cli/helpers"
"github.com/pokt-network/pocket/logger"
"github.com/pokt-network/pocket/p2p/debug"
"github.com/pokt-network/pocket/shared/messaging"
"github.com/pokt-network/pocket/shared/modules"
)

var (
listCmd = &cobra.Command{
Use: "list",
Short: "Print addresses and service URLs of known peers",
RunE: listRunE,
}
)

func init() {
PeerCmd.AddCommand(listCmd)
}

func listRunE(cmd *cobra.Command, _ []string) error {
// TECHDEBT: factor out to a helper function `GetBusFromCmd()`.
bus, ok := helpers.GetValueFromCLIContext[modules.Bus](cmd, helpers.BusCLICtxKey)
if !ok {
log.Fatal("unable to get bus from context")
}

var routerType debug.RouterType

switch {
case stakedFlag:
routerType = debug.StakedRouterType
case unstakedFlag:
routerType = debug.UnstakedRouterType
case allFlag:
fallthrough
default:
routerType = debug.AllRouterTypes
}

debugMsg := &messaging.DebugMessage{
Action: messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST,
Type: messaging.DebugMessageRoutingType_DEBUG_MESSAGE_TYPE_BROADCAST,
Message: &anypb.Any{
Value: []byte(routerType),
},
}
debugMsgAny, err := anypb.New(debugMsg)
if err != nil {
return fmt.Errorf("creating anypb from debug message: %w", err)
}

if localFlag {
// call common behavior -- debug message handler
panic("not implemented")
}

// // TECHDEBT: will need to wait for DHT bootstrapping to complete before
// // this command can be used with unstaked actors.
//
// if err := bus.GetP2PModule().Broadcast(debugMsgAny); err != nil {
// return fmt.Errorf("broadcasting debug message: %w", err)
// }

// TECHDEBT: use broadcast instead once the above TECHDEBT is resolved.
pstore, err := helpers.FetchPeerstore(cmd)
if err != nil {
logger.Global.Fatal().Err(err).Msg("Unable to retrieve the pstore")
}

if pstore.Size() == 0 {
logger.Global.Fatal().Msg("No validators found")
}

peers := pstore.GetPeerList()
if err != nil {
logger.Global.Fatal().Err(err).Msg("Failed to convert validator address into pocketCrypto.Address")
}

for _, peer := range peers {
if err := bus.GetP2PModule().Send(peer.GetAddress(), debugMsgAny); err != nil {
logger.Global.Error().Err(err).Msg("Failed to send debug message")
}
}

return nil
}
104 changes: 104 additions & 0 deletions p2p/debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package p2p

import (
"fmt"

"github.com/pokt-network/pocket/p2p/debug"
typesP2P "github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/shared/messaging"
)

func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error {
switch msg.Action {
case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST:
if !m.cfg.EnablePeerDiscoveryDebugRpc {
return typesP2P.ErrPeerDiscoveryDebugRPCDisabled
}
}

// TODO_THIS_COMMIT: add & react to `--all`, `--staked`, `--unstaked`
// persistent flags

switch msg.Action {
case messaging.DebugMessageAction_DEBUG_P2P_PEER_LIST:
routerType := debug.RouterType(msg.Message.Value)
return m.listPeers(routerType)
default:
return fmt.Errorf("unsupported P2P debug message action: %s", msg.Action)
}
}

func (m *p2pModule) listPeers(routerType debug.RouterType) error {
var (
peers typesP2P.PeerList
pstorePlurality = ""
)

switch routerType {
case debug.StakedRouterType:
// TECHDEBT: add `PeerstoreProvider#GetStakedPeerstoreAtCurrentHeight()`
// interface method.
currentHeight := m.currentHeightProvider.CurrentHeight()
pstore, err := m.pstoreProvider.GetStakedPeerstoreAtHeight(currentHeight)
if err != nil {
return fmt.Errorf("getting unstaked peerstore: %v", err)
}

peers = pstore.GetPeerList()
case debug.UnstakedRouterType:
pstore, err := m.pstoreProvider.GetUnstakedPeerstore()
if err != nil {
return fmt.Errorf("getting unstaked peerstore: %v", err)
}

peers = pstore.GetPeerList()
case debug.AllRouterTypes:
pstorePlurality = "s"

// TECHDEBT: add `PeerstoreProvider#GetStakedPeerstoreAtCurrentHeight()`
currentHeight := m.currentHeightProvider.CurrentHeight()
stakedPStore, err := m.pstoreProvider.GetStakedPeerstoreAtHeight(currentHeight)
if err != nil {
return fmt.Errorf("getting unstaked peerstore: %v", err)
}
unstakedPStore, err := m.pstoreProvider.GetUnstakedPeerstore()
if err != nil {
return fmt.Errorf("getting unstaked peerstore: %v", err)
}

unstakedPeers := unstakedPStore.GetPeerList()
stakedPeers := stakedPStore.GetPeerList()
additionalPeers, _ := unstakedPeers.Delta(stakedPeers)

// NB: there should never be any "additional" peers. This would represent
// a staked actor who is not participating in background gossip for some
// reason. It's possible that a staked actor node which has restarted
// recently and hasn't yet completed background router bootstrapping may
// result in peers experiencing this state.
if len(additionalPeers) == 0 {
return debug.PrintPeerList(unstakedPeers)
}

var allPeers typesP2P.PeerList
for _, peer := range additionalPeers {
allPeers = append(unstakedPeers, peer)
}
peers = allPeers
default:
return fmt.Errorf("unsupported router type: %s", routerType)
}

if err := debug.LogSelfAddress(m.logger, m.GetBus()); err != nil {
return fmt.Errorf("printing self address: %w", err)
}

// NB: Intentionally printing with `fmt` instead of the logger to match
// `utils.PrintPeerList` which does not use the logger due to
// incompatibilities with the tabwriter.
fmt.Printf("%s router peerstore%s:\n", routerType, pstorePlurality)

if err := debug.PrintPeerList(peers); err != nil {
return fmt.Errorf("printing peer list: %w", err)
}
return nil
}
61 changes: 61 additions & 0 deletions p2p/debug/peers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package debug

import (
"fmt"

"github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/p2p/utils"
"github.com/pokt-network/pocket/shared/modules"
)

type RouterType string

const (
StakedRouterType RouterType = "staked"
UnstakedRouterType RouterType = "unstaked"
AllRouterTypes RouterType = "all"
)

func LogSelfAddress(logger *modules.Logger, bus modules.Bus) error {
p2pModule := bus.GetP2PModule()
if p2pModule == nil {
return fmt.Errorf("no p2p module found on the bus")
}

selfAddr, err := p2pModule.GetAddress()
if err != nil {
return fmt.Errorf("getting self address: %w", err)
}

logger.Debug().Str("self_address", selfAddr.String()).Msg("")
return nil
}

// PrintPeerList prints a table of the passed peers to stdout. Header row is defined
// by `peerListTableHeader`. Row printing behavior is defined by `peerListRowConsumerFactory`.
func PrintPeerList(peers types.PeerList) error {
return utils.PrintTable(peerListTableHeader, peerListRowConsumerFactory(peers))
}

func peerListRowConsumerFactory(peers types.PeerList) utils.RowConsumer {
return func(provideRow utils.RowProvider) error {
for _, peer := range peers {
libp2pAddrInfo, err := utils.Libp2pAddrInfoFromPeer(peer)
if err != nil {
return fmt.Errorf("converting peer to libp2p addr info: %w", err)
}

err = provideRow(
libp2pAddrInfo.ID.String(),
peer.GetAddress().String(),
peer.GetServiceURL(),
)
if err != nil {
return err
}
}
return nil
}
}

var peerListTableHeader = []string{"Peer ID", "Pokt Address", "ServiceURL"}
5 changes: 0 additions & 5 deletions p2p/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package p2p

import (
"fmt"

"google.golang.org/protobuf/types/known/anypb"

"github.com/pokt-network/pocket/shared/codec"
Expand Down Expand Up @@ -95,7 +94,3 @@ func (m *p2pModule) HandleEvent(event *anypb.Any) error {

return nil
}

func (m *p2pModule) handleDebugMessage(msg *messaging.DebugMessage) error {
return nil
}
73 changes: 72 additions & 1 deletion p2p/utils/logging.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
package utils

import (
"fmt"
"net"
"os"
"text/tabwriter"

"github.com/libp2p/go-libp2p/core/network"
"github.com/rs/zerolog"

"github.com/pokt-network/pocket/p2p/types"
"github.com/pokt-network/pocket/shared/modules"
"github.com/rs/zerolog"
)

// RowProvider is a function which receives a variadic number of "column" values.
// It is intended to be passed to a `RowConsumer` so that the consumer can operate
// on the column values, row-by-row, without having to know how to produce them.
type RowProvider func(columns ...string) error

// RowConsumer is any function which receives a `RowProvider` in order to consume
// its column values, row-by-row.
type RowConsumer func(RowProvider) error

type scopeCallback func(scope network.ResourceScope) error

// LogScopeStatFactory returns a function which prints the given scope stat fields
Expand Down Expand Up @@ -41,6 +54,64 @@ func LogIncomingMsg(logger *modules.Logger, hostname string, peer types.Peer) {
logMessage(logger, msg, hostname, peer)
}

// Print table prints a table to stdout. Header row is defined by `header`. Row printing
// behavior is defined by `consumeRows`. Header length SHOULD match row length.
func PrintTable(header []string, consumeRows RowConsumer) error {
w := new(tabwriter.Writer)
w.Init(os.Stdout, 0, 0, 1, ' ', 0)

// Print header
for _, col := range header {
if _, err := fmt.Fprintf(w, "| %s\t", col); err != nil {
return err
}
}
if _, err := fmt.Fprintln(w, "|"); err != nil {
return err
}

// Print separator
for _, col := range header {
if _, err := fmt.Fprintf(w, "| "); err != nil {
return err
}
for range col {
if _, err := fmt.Fprintf(w, "-"); err != nil {
return err
}
}
if _, err := fmt.Fprintf(w, "\t"); err != nil {
return err
}
}
if _, err := fmt.Fprintln(w, "|"); err != nil {
return err
}

// Print rows -- `consumeRows` will call this function once for each row.
err := consumeRows(func(row ...string) error {
for _, col := range row {
if _, err := fmt.Fprintf(w, "| %s\t", col); err != nil {
return err
}
}
if _, err := fmt.Fprintln(w, "|"); err != nil {
return err
}
return nil
})
if err != nil {
return err
}

// Flush the buffer and print the table
if err := w.Flush(); err != nil {
return err
}

return nil
}

func logMessage(logger *modules.Logger, msg, hostname string, peer types.Peer) {
remoteHostname, _, err := net.SplitHostPort(peer.GetServiceURL())
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions shared/messaging/proto/debug_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ enum DebugMessageAction {

DEBUG_PERSISTENCE_CLEAR_STATE = 8;
DEBUG_PERSISTENCE_RESET_TO_GENESIS = 9;
DEBUG_P2P_PEER_LIST = 10;
}

message DebugMessage {
Expand Down

0 comments on commit d1e811e

Please sign in to comment.