Skip to content

Commit

Permalink
Merge pull request #6172 from onflow/janez/expose-transaction-metrics
Browse files Browse the repository at this point in the history
Add grpc endpoint to EN for transaction execution metrics
janezpodhostnik authored Sep 11, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
2 parents 72ac1d1 + 2464067 commit 9653906
Showing 19 changed files with 984 additions and 41 deletions.
43 changes: 41 additions & 2 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
@@ -49,6 +49,7 @@ import (
"github.com/onflow/flow-go/engine/execution/checker"
"github.com/onflow/flow-go/engine/execution/computation"
"github.com/onflow/flow-go/engine/execution/computation/committer"
txmetrics "github.com/onflow/flow-go/engine/execution/computation/metrics"
"github.com/onflow/flow-go/engine/execution/ingestion"
"github.com/onflow/flow-go/engine/execution/ingestion/fetcher"
"github.com/onflow/flow-go/engine/execution/ingestion/loader"
@@ -127,7 +128,7 @@ type ExecutionNode struct {

ingestionUnit *engine.Unit

collector module.ExecutionMetrics
collector *metrics.ExecutionCollector
executionState state.ExecutionState
followerState protocol.FollowerState
committee hotstuff.DynamicCommittee
@@ -160,6 +161,7 @@ type ExecutionNode struct {
executionDataTracker tracker.Storage
blobService network.BlobService
blobserviceDependable *module.ProxiedReadyDoneAware
metricsProvider txmetrics.TransactionExecutionMetricsProvider
}

func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
@@ -228,6 +230,7 @@ func (builder *ExecutionNodeBuilder) LoadComponentsAndModules() {
Component("block data upload manager", exeNode.LoadBlockUploaderManager).
Component("GCP block data uploader", exeNode.LoadGCPBlockDataUploader).
Component("S3 block data uploader", exeNode.LoadS3BlockDataUploader).
Component("transaction execution metrics", exeNode.LoadTransactionExecutionMetrics).
Component("provider engine", exeNode.LoadProviderEngine).
Component("checker engine", exeNode.LoadCheckerEngine).
Component("ingestion engine", exeNode.LoadIngestionEngine).
@@ -544,10 +547,27 @@ func (exeNode *ExecutionNode) LoadProviderEngine(

vmCtx := fvm.NewContext(opts...)

var collector module.ExecutionMetrics
collector = exeNode.collector
if exeNode.exeConf.transactionExecutionMetricsEnabled {
// inject the transaction execution metrics
collector = exeNode.collector.WithTransactionCallback(
func(dur time.Duration, stats module.TransactionExecutionResultStats, info module.TransactionExecutionResultInfo) {
exeNode.metricsProvider.Collect(
info.BlockID,
info.BlockHeight,
txmetrics.TransactionExecutionMetrics{
TransactionID: info.TransactionID,
ExecutionTime: dur,
ExecutionEffortWeights: stats.ComputationIntensities,
})
})
}

ledgerViewCommitter := committer.NewLedgerViewCommitter(exeNode.ledgerStorage, node.Tracer)
manager, err := computation.New(
node.Logger,
exeNode.collector,
collector,
node.Tracer,
node.Me,
node.State,
@@ -1130,6 +1150,24 @@ func (exeNode *ExecutionNode) LoadScriptsEngine(node *NodeConfig) (module.ReadyD
return exeNode.scriptsEng, nil
}

func (exeNode *ExecutionNode) LoadTransactionExecutionMetrics(
node *NodeConfig,
) (module.ReadyDoneAware, error) {
lastFinalizedHeader := node.LastFinalizedHeader

metricsProvider := txmetrics.NewTransactionExecutionMetricsProvider(
node.Logger,
exeNode.executionState,
node.Storage.Headers,
lastFinalizedHeader.Height,
exeNode.exeConf.transactionExecutionMetricsBufferSize,
)

node.ProtocolEvents.AddConsumer(metricsProvider)
exeNode.metricsProvider = metricsProvider
return metricsProvider, nil
}

func (exeNode *ExecutionNode) LoadConsensusCommittee(
node *NodeConfig,
) (
@@ -1331,6 +1369,7 @@ func (exeNode *ExecutionNode) LoadGrpcServer(
exeNode.results,
exeNode.txResults,
node.Storage.Commits,
exeNode.metricsProvider,
node.RootChainID,
signature.NewBlockSignerDecoder(exeNode.committee),
exeNode.exeConf.apiRatelimits,
62 changes: 33 additions & 29 deletions cmd/execution_config.go
Original file line number Diff line number Diff line change
@@ -25,35 +25,37 @@ import (

// ExecutionConfig contains the configs for starting up execution nodes
type ExecutionConfig struct {
rpcConf rpc.Config
triedir string
executionDataDir string
registerDir string
mTrieCacheSize uint32
transactionResultsCacheSize uint
checkpointDistance uint
checkpointsToKeep uint
chunkDataPackDir string
chunkDataPackCacheSize uint
chunkDataPackRequestsCacheSize uint32
requestInterval time.Duration
extensiveLog bool
pauseExecution bool
chunkDataPackQueryTimeout time.Duration
chunkDataPackDeliveryTimeout time.Duration
enableBlockDataUpload bool
gcpBucketName string
s3BucketName string
apiRatelimits map[string]int
apiBurstlimits map[string]int
executionDataAllowedPeers string
executionDataPrunerHeightRangeTarget uint64
executionDataPrunerThreshold uint64
blobstoreRateLimit int
blobstoreBurstLimit int
chunkDataPackRequestWorkers uint
maxGracefulStopDuration time.Duration
importCheckpointWorkerCount int
rpcConf rpc.Config
triedir string
executionDataDir string
registerDir string
mTrieCacheSize uint32
transactionResultsCacheSize uint
checkpointDistance uint
checkpointsToKeep uint
chunkDataPackDir string
chunkDataPackCacheSize uint
chunkDataPackRequestsCacheSize uint32
requestInterval time.Duration
extensiveLog bool
pauseExecution bool
chunkDataPackQueryTimeout time.Duration
chunkDataPackDeliveryTimeout time.Duration
enableBlockDataUpload bool
gcpBucketName string
s3BucketName string
apiRatelimits map[string]int
apiBurstlimits map[string]int
executionDataAllowedPeers string
executionDataPrunerHeightRangeTarget uint64
executionDataPrunerThreshold uint64
blobstoreRateLimit int
blobstoreBurstLimit int
chunkDataPackRequestWorkers uint
maxGracefulStopDuration time.Duration
importCheckpointWorkerCount int
transactionExecutionMetricsEnabled bool
transactionExecutionMetricsBufferSize uint

// evm tracing configuration
evmTracingEnabled bool
@@ -122,6 +124,8 @@ func (exeConf *ExecutionConfig) SetupFlags(flags *pflag.FlagSet) {
flags.IntVar(&exeConf.blobstoreBurstLimit, "blobstore-burst-limit", 0, "outgoing burst limit for Execution Data blobstore")
flags.DurationVar(&exeConf.maxGracefulStopDuration, "max-graceful-stop-duration", stop.DefaultMaxGracefulStopDuration, "the maximum amount of time stop control will wait for ingestion engine to gracefully shutdown before crashing")
flags.IntVar(&exeConf.importCheckpointWorkerCount, "import-checkpoint-worker-count", 10, "number of workers to import checkpoint file during bootstrap")
flags.BoolVar(&exeConf.transactionExecutionMetricsEnabled, "tx-execution-metrics", true, "enable collection of transaction execution metrics")
flags.UintVar(&exeConf.transactionExecutionMetricsBufferSize, "tx-execution-metrics-buffer-size", 200, "buffer size for transaction execution metrics. The buffer size is the number of blocks that are kept in memory by the metrics provider engine")
flags.BoolVar(&exeConf.evmTracingEnabled, "evm-tracing-enabled", false, "enable EVM tracing, when set it will generate traces and upload them to the GCP bucket provided by the --evm-traces-gcp-bucket. Warning: this might affect speed of execution")
flags.StringVar(&exeConf.evmTracesGCPBucket, "evm-traces-gcp-bucket", "", "define GCP bucket name used for uploading EVM traces, must be used in combination with --evm-tracing-enabled. if left empty the upload step is skipped")

37 changes: 37 additions & 0 deletions engine/access/mock/execution_api_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions engine/access/mock/execution_api_server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion engine/execution/computation/computer/computer.go
Original file line number Diff line number Diff line change
@@ -256,7 +256,6 @@ func (e *blockComputer) queueTransactionRequests(
i == len(collection.Transactions)-1)
txnIndex += 1
}

}

systemCtx := fvm.NewContextFromParent(
130 changes: 130 additions & 0 deletions engine/execution/computation/metrics/collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package metrics

import (
"sync"

"github.com/rs/zerolog"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
)

type collector struct {
log zerolog.Logger

collection chan metrics

mu sync.Mutex

lowestAvailableHeight uint64
blocksAtHeight map[uint64]map[flow.Identifier]struct{}
metrics map[flow.Identifier][]TransactionExecutionMetrics
}

func newCollector(
log zerolog.Logger,
lowestAvailableHeight uint64,
) *collector {
return &collector{
log: log,
lowestAvailableHeight: lowestAvailableHeight,

collection: make(chan metrics, 1000),
blocksAtHeight: make(map[uint64]map[flow.Identifier]struct{}),
metrics: make(map[flow.Identifier][]TransactionExecutionMetrics),
}
}

// Collect should never block because it's called from the execution
func (c *collector) Collect(
blockId flow.Identifier,
blockHeight uint64,
t TransactionExecutionMetrics,
) {
select {
case c.collection <- metrics{
TransactionExecutionMetrics: t,
blockHeight: blockHeight,
blockId: blockId,
}:
default:
c.log.Warn().
Uint64("height", blockHeight).
Msg("dropping metrics because the collection channel is full")
}
}

func (c *collector) metricsCollectorWorker(
ctx irrecoverable.SignalerContext,
ready component.ReadyFunc,
) {
ready()

for {
select {
case <-ctx.Done():
return
case m := <-c.collection:
c.collect(m.blockId, m.blockHeight, m.TransactionExecutionMetrics)
}
}
}

func (c *collector) collect(
blockId flow.Identifier,
blockHeight uint64,
t TransactionExecutionMetrics,
) {
c.mu.Lock()
defer c.mu.Unlock()

if blockHeight <= c.lowestAvailableHeight {
c.log.Warn().
Uint64("height", blockHeight).
Uint64("lowestAvailableHeight", c.lowestAvailableHeight).
Msg("received metrics for a block that is older or equal than the most recent block")
return
}

if _, ok := c.blocksAtHeight[blockHeight]; !ok {
c.blocksAtHeight[blockHeight] = make(map[flow.Identifier]struct{})
}
c.blocksAtHeight[blockHeight][blockId] = struct{}{}
c.metrics[blockId] = append(c.metrics[blockId], t)
}

// Pop returns the metrics for the given finalized block at the given height
// and clears all data up to the given height.
func (c *collector) Pop(height uint64, finalizedBlockId flow.Identifier) []TransactionExecutionMetrics {
c.mu.Lock()
defer c.mu.Unlock()

if height <= c.lowestAvailableHeight {
c.log.Warn().
Uint64("height", height).
Stringer("finalizedBlockId", finalizedBlockId).
Msg("requested metrics for a finalizedBlockId that is older or equal than the most recent finalizedBlockId")
return nil
}

// only return metrics for finalized block
metrics := c.metrics[finalizedBlockId]

c.advanceTo(height)

return metrics
}

// advanceTo moves the latest height to the given height
// all data at lower heights will be deleted
func (c *collector) advanceTo(height uint64) {
for c.lowestAvailableHeight < height {
blocks := c.blocksAtHeight[c.lowestAvailableHeight]
for block := range blocks {
delete(c.metrics, block)
}
delete(c.blocksAtHeight, c.lowestAvailableHeight)
c.lowestAvailableHeight++
}
}
Loading

0 comments on commit 9653906

Please sign in to comment.