Skip to content

Commit

Permalink
Merge branch 'master' into leo/adjust-metrics-buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangchiqing authored Sep 12, 2023
2 parents 72a4072 + 22a9246 commit ec8e1c4
Show file tree
Hide file tree
Showing 47 changed files with 523 additions and 131 deletions.
2 changes: 1 addition & 1 deletion CodingConventions.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ happy path is either
Therefore, changing the set of specified sentinel errors is generally considered a breaking API change.


2. **All errors beyond the specified, benign sentinel errors ere considered unexpected failures, i.e. a symptom for potential state corruption.**
2. **All errors beyond the specified, benign sentinel errors are considered unexpected failures, i.e. a symptom of potential state corruption.**
* We employ a fundamental principle of [High Assurance Software Engineering](https://www.researchgate.net/publication/228563190_High_Assurance_Software_Development),
where we treat everything beyond the known benign errors as critical failures. In unexpected failure cases, we assume that the vertex's in-memory state has been
broken and proper functioning is no longer guaranteed. The only safe route of recovery is to restart the vertex from a previously persisted, safe state.
Expand Down
3 changes: 0 additions & 3 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,9 +874,6 @@ func (exeNode *ExecutionNode) LoadIngestionEngine(
node.Storage.Headers,
node.Storage.Blocks,
node.Storage.Collections,
exeNode.events,
exeNode.serviceEvents,
exeNode.txResults,
exeNode.computationManager,
exeNode.providerEngine,
exeNode.executionState,
Expand Down
3 changes: 2 additions & 1 deletion cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"github.com/onflow/flow-go/network/p2p/keyutils"
"github.com/onflow/flow-go/network/p2p/p2pbuilder"
p2pconfig "github.com/onflow/flow-go/network/p2p/p2pbuilder/config"
"github.com/onflow/flow-go/network/p2p/p2plogging"
"github.com/onflow/flow-go/network/p2p/p2pnet"
"github.com/onflow/flow-go/network/p2p/subscription"
"github.com/onflow/flow-go/network/p2p/tracer"
Expand Down Expand Up @@ -588,7 +589,7 @@ func (builder *ObserverServiceBuilder) InitIDProviders() {

if flowID, err := builder.IDTranslator.GetFlowID(pid); err != nil {
// TODO: this is an instance of "log error and continue with best effort" anti-pattern
builder.Logger.Err(err).Str("peer", pid.String()).Msg("failed to translate to Flow ID")
builder.Logger.Err(err).Str("peer", p2plogging.PeerId(pid)).Msg("failed to translate to Flow ID")
} else {
result = append(result, flowID)
}
Expand Down
113 changes: 113 additions & 0 deletions cmd/util/cmd/checkpoint-trie-stats/cmd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package checkpoint_trie_stats

import (
"errors"
"fmt"

"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

"github.com/onflow/flow-go/ledger/complete/mtrie/node"
"github.com/onflow/flow-go/ledger/complete/mtrie/trie"
"github.com/onflow/flow-go/ledger/complete/wal"
)

var (
flagCheckpoint string
flagTrieIndex int
)

var Cmd = &cobra.Command{
Use: "checkpoint-trie-stats",
Short: "List the trie node count by types in a checkpoint, show total payload size",
Run: run,
}

func init() {

Cmd.Flags().StringVar(&flagCheckpoint, "checkpoint", "",
"checkpoint file to read")
_ = Cmd.MarkFlagRequired("checkpoint")
Cmd.Flags().IntVar(&flagTrieIndex, "trie-index", 0, "trie index to read, 0 being the first trie, -1 is the last trie")

}

func run(*cobra.Command, []string) {

log.Info().Msgf("loading checkpoint %v, reading %v-th trie", flagCheckpoint, flagTrieIndex)
res, err := scanCheckpoint(flagCheckpoint, flagTrieIndex, log.Logger)
if err != nil {
log.Fatal().Err(err).Msg("fail to scan checkpoint")
}
log.Info().
Str("TrieRootHash", res.trieRootHash).
Int("InterimNodeCount", res.interimNodeCount).
Int("LeafNodeCount", res.leafNodeCount).
Int("TotalPayloadSize", res.totalPayloadSize).
Msgf("successfully scanned checkpoint %v", flagCheckpoint)
}

type result struct {
trieRootHash string
interimNodeCount int
leafNodeCount int
totalPayloadSize int
}

func readTrie(tries []*trie.MTrie, index int) (*trie.MTrie, error) {
if len(tries) == 0 {
return nil, errors.New("No tries available")
}

if index < -len(tries) || index >= len(tries) {
return nil, fmt.Errorf("index %d out of range", index)
}

if index < 0 {
return tries[len(tries)+index], nil
}

return tries[index], nil
}

func scanCheckpoint(checkpoint string, trieIndex int, log zerolog.Logger) (result, error) {
tries, err := wal.LoadCheckpoint(flagCheckpoint, log)
if err != nil {
return result{}, fmt.Errorf("error while loading checkpoint: %w", err)
}

log.Info().
Int("total_tries", len(tries)).
Msg("checkpoint loaded")

t, err := readTrie(tries, trieIndex)
if err != nil {
return result{}, fmt.Errorf("error while reading trie: %w", err)
}

log.Info().Msgf("trie loaded, root hash: %v", t.RootHash())

res := &result{
trieRootHash: t.RootHash().String(),
interimNodeCount: 0,
leafNodeCount: 0,
totalPayloadSize: 0,
}
processNode := func(n *node.Node) error {
if n.IsLeaf() {
res.leafNodeCount++
res.totalPayloadSize += n.Payload().Size()
} else {
res.interimNodeCount++
}
return nil
}

err = trie.TraverseNodes(t, processNode)
if err != nil {
return result{}, fmt.Errorf("fail to traverse the trie: %w", err)
}

return *res, nil
}
9 changes: 0 additions & 9 deletions engine/execution/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ type Engine struct {
headers storage.Headers // see comments on getHeaderByHeight for why we need it
blocks storage.Blocks
collections storage.Collections
events storage.Events
serviceEvents storage.ServiceEvents
transactionResults storage.TransactionResults
computationManager computation.ComputationManager
providerEngine provider.ProviderEngine
mempool *Mempool
Expand Down Expand Up @@ -81,9 +78,6 @@ func New(
headers storage.Headers,
blocks storage.Blocks,
collections storage.Collections,
events storage.Events,
serviceEvents storage.ServiceEvents,
transactionResults storage.TransactionResults,
executionEngine computation.ComputationManager,
providerEngine provider.ProviderEngine,
execState state.ExecutionState,
Expand All @@ -109,9 +103,6 @@ func New(
headers: headers,
blocks: blocks,
collections: collections,
events: events,
serviceEvents: serviceEvents,
transactionResults: transactionResults,
computationManager: executionEngine,
providerEngine: providerEngine,
mempool: mempool,
Expand Down
13 changes: 0 additions & 13 deletions engine/execution/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,6 @@ func runWithEngine(t *testing.T, f func(testingContext)) {
blocks := storage.NewMockBlocks(ctrl)
payloads := storage.NewMockPayloads(ctrl)
collections := storage.NewMockCollections(ctrl)
events := storage.NewMockEvents(ctrl)
serviceEvents := storage.NewMockServiceEvents(ctrl)
txResults := storage.NewMockTransactionResults(ctrl)

computationManager := new(computation.ComputationManager)
providerEngine := new(provider.ProviderEngine)
Expand Down Expand Up @@ -188,7 +185,6 @@ func runWithEngine(t *testing.T, f func(testingContext)) {
return identity
}, nil)

txResults.EXPECT().BatchStore(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
payloads.EXPECT().Store(gomock.Any(), gomock.Any()).AnyTimes()

log := unittest.Logger()
Expand Down Expand Up @@ -229,9 +225,6 @@ func runWithEngine(t *testing.T, f func(testingContext)) {
headers,
blocks,
collections,
events,
serviceEvents,
txResults,
computationManager,
providerEngine,
executionState,
Expand Down Expand Up @@ -1509,9 +1502,6 @@ func newIngestionEngine(t *testing.T, ps *mocks.ProtocolState, es *mockExecution
headers := storage.NewMockHeaders(ctrl)
blocks := storage.NewMockBlocks(ctrl)
collections := storage.NewMockCollections(ctrl)
events := storage.NewMockEvents(ctrl)
serviceEvents := storage.NewMockServiceEvents(ctrl)
txResults := storage.NewMockTransactionResults(ctrl)

computationManager := new(computation.ComputationManager)
providerEngine := new(provider.ProviderEngine)
Expand All @@ -1531,9 +1521,6 @@ func newIngestionEngine(t *testing.T, ps *mocks.ProtocolState, es *mockExecution
headers,
blocks,
collections,
events,
serviceEvents,
txResults,
computationManager,
providerEngine,
es,
Expand Down
3 changes: 0 additions & 3 deletions engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -720,9 +720,6 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity *flow.Identity, identit
node.Headers,
node.Blocks,
collectionsStorage,
eventsStorage,
serviceEventsStorage,
txResultStorage,
computationEngine,
pusherEngine,
execState,
Expand Down
6 changes: 3 additions & 3 deletions flips/network-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ When the message is dequeued, the engine should check the `Context` to see wheth

These can be combined into a [single context](https://github.com/teivah/onecontext) which can be used by the message processing business logic, so that the processing can be cancelled either by the network or by the engine. This will allow us to deprecate [`engine.Unit`](https://github.com/onflow/flow-go/blob/master/engine/unit.go), which uses a single `Context` for the entire engine.

There are certain types of messages (e.g block proposals) which may transit between the private and public networks via relay nodes (e.g Access Nodes). Libp2p's [default message ID function](https://github.com/libp2p/go-libp2p-pubsub/blob/0c7092d1f50091ae88407ba93103ac5868da3d0a/pubsub.go#L1040-L1043) will treat a message originating from one network, relayed to the other network by `n` distinct relay nodes, as `n` distinct messages, causing unnacceptable message duplification / traffic amplification. In order to prevent this, we will need to define a [custom message ID function](https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#WithMessageIdFn) which returns the hash of the message [`Payload`](https://github.com/onflow/flow-go/blob/698c77460bc33d1a8ee8a154f7fe4877bc518a02/network/message/message.proto#L13).
There are certain types of messages (e.g block proposals) which may transit between the private and public networks via relay nodes (e.g Access Nodes). Libp2p's [default message ID function](https://github.com/libp2p/go-libp2p-pubsub/blob/0c7092d1f50091ae88407ba93103ac5868da3d0a/pubsub.go#L1040-L1043) will treat a message originating from one network, relayed to the other network by `n` distinct relay nodes, as `n` distinct messages, causing unacceptable message duplification / traffic amplification. In order to prevent this, we will need to define a [custom message ID function](https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#WithMessageIdFn) which returns the hash of the message [`Payload`](https://github.com/onflow/flow-go/blob/698c77460bc33d1a8ee8a154f7fe4877bc518a02/network/message/message.proto#L13).

In order to avoid making the message ID function deserialize the `Message` to access the `Payload`, we need to remove all other fields from the `Message` protobuf so that the message ID function can simply take the hash of the the pubsub [`Data`](https://github.com/libp2p/go-libp2p-pubsub/blob/0c7092d1f50091ae88407ba93103ac5868da3d0a/pb/rpc.pb.go#L145) field without needing to do any deserialization.
In order to avoid making the message ID function deserialize the `Message` to access the `Payload`, we need to remove all other fields from the `Message` protobuf so that the message ID function can simply take the hash of the pubsub [`Data`](https://github.com/libp2p/go-libp2p-pubsub/blob/0c7092d1f50091ae88407ba93103ac5868da3d0a/pb/rpc.pb.go#L145) field without needing to do any deserialization.

The `Multicast` implementation will need to be changed to make direct connections to the target peers instead of sending messages with a `TargetIDs` field via gossip.

Expand All @@ -90,4 +90,4 @@ The `Multicast` implementation will need to be changed to make direct connection
- Since existing calls to `Multicast` only target 3 peers, changing the implementation to use direct connections instead of gossip will reduce traffic on the network and make it more efficient.
- While `engine.Unit` provides some useful functionalities, it also uses the anti-pattern of [storing a `Context` inside a struct](https://github.com/onflow/flow-go/blob/b50f0ffe054103a82e4aa9e0c9e4610c2cbf2cc9/engine/unit.go#L117), something which is [specifically advised against](https://pkg.go.dev/context#:~:text=Do%20not%20store%20Contexts%20inside%20a%20struct%20type%3B%20instead%2C%20pass%20a%20Context%20explicitly%20to%20each%20function%20that%20needs%20it.%20The%20Context%20should%20be%20the%20first%20parameter%2C%20typically%20named%20ctx%3A) by [the developers of Go](https://go.dev/blog/context-and-structs#TOC_2.). Here is an [example](https://go.dev/blog/context-and-structs#:~:text=Storing%20context%20in%20structs%20leads%20to%20confusion) illustrating some of the problems with this approach.

## Implementation (TODO)
## Implementation (TODO)
3 changes: 2 additions & 1 deletion follower/follower_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/onflow/flow-go/network/p2p/keyutils"
"github.com/onflow/flow-go/network/p2p/p2pbuilder"
p2pconfig "github.com/onflow/flow-go/network/p2p/p2pbuilder/config"
"github.com/onflow/flow-go/network/p2p/p2plogging"
"github.com/onflow/flow-go/network/p2p/p2pnet"
"github.com/onflow/flow-go/network/p2p/subscription"
"github.com/onflow/flow-go/network/p2p/tracer"
Expand Down Expand Up @@ -470,7 +471,7 @@ func (builder *FollowerServiceBuilder) InitIDProviders() {

if flowID, err := builder.IDTranslator.GetFlowID(pid); err != nil {
// TODO: this is an instance of "log error and continue with best effort" anti-pattern
builder.Logger.Err(err).Str("peer", pid.String()).Msg("failed to translate to Flow ID")
builder.Logger.Err(err).Str("peer", p2plogging.PeerId(pid)).Msg("failed to translate to Flow ID")
} else {
result = append(result, flowID)
}
Expand Down
5 changes: 3 additions & 2 deletions insecure/corruptlibp2p/pubsub_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/network/p2p"
"github.com/onflow/flow-go/network/p2p/p2plogging"
"github.com/onflow/flow-go/utils/logging"
)

Expand Down Expand Up @@ -65,7 +66,7 @@ func (c *CorruptGossipSubAdapter) RegisterTopicValidator(topic string, topicVali
c.logger.Fatal().
Bool(logging.KeySuspicious, true).
Str("topic", topic).
Str("origin_peer", from.String()).
Str("origin_peer", p2plogging.PeerId(from)).
Str("result", fmt.Sprintf("%v", result)).
Str("message_type", fmt.Sprintf("%T", message.Data)).
Msgf("invalid validation result, should be a bug in the topic validator")
Expand All @@ -74,7 +75,7 @@ func (c *CorruptGossipSubAdapter) RegisterTopicValidator(topic string, topicVali
c.logger.Warn().
Bool(logging.KeySuspicious, true).
Str("topic", topic).
Str("origin_peer", from.String()).
Str("origin_peer", p2plogging.PeerId(from)).
Str("result", fmt.Sprintf("%v", result)).
Str("message_type", fmt.Sprintf("%T", message.Data)).
Msg("invalid validation result, returning reject")
Expand Down
28 changes: 28 additions & 0 deletions ledger/complete/mtrie/trie/trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,3 +831,31 @@ func minInt(a, b int) int {
}
return b
}

// TraverseNodes traverses all nodes of the trie in DFS order
func TraverseNodes(trie *MTrie, processNode func(*node.Node) error) error {
return traverseRecursive(trie.root, processNode)
}

func traverseRecursive(n *node.Node, processNode func(*node.Node) error) error {
if n == nil {
return nil
}

err := processNode(n)
if err != nil {
return err
}

err = traverseRecursive(n.LeftChild(), processNode)
if err != nil {
return err
}

err = traverseRecursive(n.RightChild(), processNode)
if err != nil {
return err
}

return nil
}
13 changes: 7 additions & 6 deletions module/metrics/libp2p_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rs/zerolog"

"github.com/onflow/flow-go/network/p2p/p2plogging"
"github.com/onflow/flow-go/utils/logging"
)

Expand Down Expand Up @@ -171,22 +172,22 @@ func (l *LibP2PResourceManagerMetrics) BlockConn(dir network.Direction, usefd bo

func (l *LibP2PResourceManagerMetrics) AllowStream(p peer.ID, dir network.Direction) {
l.allowStreamCount.WithLabelValues(dir.String()).Inc()
l.logger.Trace().Str("peer", p.String()).Str("direction", dir.String()).Msg("allowing stream")
l.logger.Trace().Str("peer", p2plogging.PeerId(p)).Str("direction", dir.String()).Msg("allowing stream")
}

func (l *LibP2PResourceManagerMetrics) BlockStream(p peer.ID, dir network.Direction) {
l.blockStreamCount.WithLabelValues(dir.String()).Inc()
l.logger.Debug().Bool(logging.KeySuspicious, true).Str("peer", p.String()).Str("direction", dir.String()).Msg("blocking stream")
l.logger.Debug().Bool(logging.KeySuspicious, true).Str("peer", p2plogging.PeerId(p)).Str("direction", dir.String()).Msg("blocking stream")
}

func (l *LibP2PResourceManagerMetrics) AllowPeer(p peer.ID) {
l.allowPeerCount.Inc()
l.logger.Trace().Str("peer", p.String()).Msg("allowing peer")
l.logger.Trace().Str("peer", p2plogging.PeerId(p)).Msg("allowing peer")
}

func (l *LibP2PResourceManagerMetrics) BlockPeer(p peer.ID) {
l.blockPeerCount.Inc()
l.logger.Debug().Bool(logging.KeySuspicious, true).Str("peer", p.String()).Msg("blocking peer")
l.logger.Debug().Bool(logging.KeySuspicious, true).Str("peer", p2plogging.PeerId(p)).Msg("blocking peer")
}

func (l *LibP2PResourceManagerMetrics) AllowProtocol(proto protocol.ID) {
Expand All @@ -201,7 +202,7 @@ func (l *LibP2PResourceManagerMetrics) BlockProtocol(proto protocol.ID) {

func (l *LibP2PResourceManagerMetrics) BlockProtocolPeer(proto protocol.ID, p peer.ID) {
l.blockProtocolPeerCount.Inc()
l.logger.Debug().Bool(logging.KeySuspicious, true).Str("protocol", string(proto)).Str("peer", p.String()).Msg("blocking protocol for peer")
l.logger.Debug().Bool(logging.KeySuspicious, true).Str("protocol", string(proto)).Str("peer", p2plogging.PeerId(p)).Msg("blocking protocol for peer")
}

func (l *LibP2PResourceManagerMetrics) AllowService(svc string) {
Expand All @@ -216,7 +217,7 @@ func (l *LibP2PResourceManagerMetrics) BlockService(svc string) {

func (l *LibP2PResourceManagerMetrics) BlockServicePeer(svc string, p peer.ID) {
l.blockServicePeerCount.Inc()
l.logger.Debug().Bool(logging.KeySuspicious, true).Str("service", svc).Str("peer", p.String()).Msg("blocking service for peer")
l.logger.Debug().Bool(logging.KeySuspicious, true).Str("service", svc).Str("peer", p2plogging.PeerId(p)).Msg("blocking service for peer")
}

func (l *LibP2PResourceManagerMetrics) AllowMemory(size int) {
Expand Down
Loading

0 comments on commit ec8e1c4

Please sign in to comment.