From 001d051e2f975a7b58299ac65f6d85ee64a0053f Mon Sep 17 00:00:00 2001 From: Hussam Date: Mon, 16 Dec 2024 14:48:09 -0600 Subject: [PATCH] Collect/peak filter rolling tx fee stats --- common/big.go | 3 + core/core.go | 4 ++ core/state_processor.go | 120 ++++++++++++++++++++++++++++------- core/state_processor_test.go | 88 +++++++++++++++++++++++++ core/types/transaction.go | 2 +- internal/quaiapi/api.go | 9 +++ internal/quaiapi/backend.go | 1 + quai/api_backend.go | 4 ++ 8 files changed, 206 insertions(+), 25 deletions(-) create mode 100644 core/state_processor_test.go diff --git a/common/big.go b/common/big.go index 3626dd51a0..5efbe9e231 100644 --- a/common/big.go +++ b/common/big.go @@ -34,6 +34,9 @@ var ( Big3 = big.NewInt(3) Big8 = big.NewInt(8) Big32 = big.NewInt(32) + Big99 = big.NewInt(99) + Big100 = big.NewInt(100) + Big101 = big.NewInt(101) Big256 = big.NewInt(256) Big257 = big.NewInt(257) Big2e256 = new(big.Int).Exp(big.NewInt(2), big.NewInt(256), big.NewInt(0)) diff --git a/core/core.go b/core/core.go index fe2a1d5a8a..0787a1efbd 100644 --- a/core/core.go +++ b/core/core.go @@ -1365,6 +1365,10 @@ func (c *Core) SendTxToSharingClients(tx *types.Transaction) { c.sl.txPool.SendTxToSharingClients(tx) } +func (c *Core) GetRollingFeeInfo() (min, max, avg *big.Int) { + return c.Processor().GetRollingFeeInfo() +} + func (c *Core) SuggestFinalityDepth(qiValue *big.Int, correlatedRisk *big.Int) *big.Int { qiRewardPerBlock := misc.CalculateQiReward(c.CurrentHeader().WorkObjectHeader()) diff --git a/core/state_processor.go b/core/state_processor.go index 444140f260..ba2851c7fb 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -31,6 +31,7 @@ import ( lru "github.com/hashicorp/golang-lru/v2" "github.com/dominant-strategies/go-quai/common" + bigMath "github.com/dominant-strategies/go-quai/common/math" "github.com/dominant-strategies/go-quai/common/prque" "github.com/dominant-strategies/go-quai/consensus" "github.com/dominant-strategies/go-quai/consensus/misc" @@ -107,19 +108,20 @@ var defaultCacheConfig = &CacheConfig{ // // StateProcessor implements Processor. type StateProcessor struct { - config *params.ChainConfig // Chain configuration options - hc *HeaderChain // Canonical block chain - engine consensus.Engine // Consensus engine used for block rewards - logsFeed event.Feed - rmLogsFeed event.Feed - cacheConfig *CacheConfig // CacheConfig for StateProcessor - stateCache state.Database // State database to reuse between imports (contains state cache) - etxCache state.Database // ETX database to reuse between imports (contains ETX cache) - receiptsCache *lru.Cache[common.Hash, types.Receipts] // Cache for the most recent receipts per block - txLookupCache *lru.Cache[common.Hash, rawdb.LegacyTxLookupEntry] - validator Validator // Block and state validator interface - prefetcher Prefetcher - vmConfig vm.Config + config *params.ChainConfig // Chain configuration options + hc *HeaderChain // Canonical block chain + engine consensus.Engine // Consensus engine used for block rewards + logsFeed event.Feed + rmLogsFeed event.Feed + cacheConfig *CacheConfig // CacheConfig for StateProcessor + stateCache state.Database // State database to reuse between imports (contains state cache) + etxCache state.Database // ETX database to reuse between imports (contains ETX cache) + receiptsCache *lru.Cache[common.Hash, types.Receipts] // Cache for the most recent receipts per block + txLookupCache *lru.Cache[common.Hash, rawdb.LegacyTxLookupEntry] + validator Validator // Block and state validator interface + prefetcher Prefetcher + vmConfig vm.Config + minFee, maxFee, avgFee, numElements *big.Int scope event.SubscriptionScope wg sync.WaitGroup // chain processing wait group for shutting down @@ -221,17 +223,19 @@ type UtxosCreatedDeleted struct { // transactions failed to execute due to insufficient gas it will return an error. func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (types.Receipts, []*types.Transaction, []*types.Log, *state.StateDB, uint64, uint64, uint64, *multiset.MultiSet, []common.Unlock, error) { var ( - receipts types.Receipts - usedGas = new(uint64) - usedState = new(uint64) - header = types.CopyWorkObject(block) - blockHash = block.Hash() - nodeLocation = p.hc.NodeLocation() - nodeCtx = p.hc.NodeCtx() - blockNumber = block.Number(nodeCtx) - parentHash = block.ParentHash(nodeCtx) - allLogs []*types.Log - gp = new(types.GasPool).AddGas(block.GasLimit()) + receipts types.Receipts + usedGas = new(uint64) + usedState = new(uint64) + header = types.CopyWorkObject(block) + blockHash = block.Hash() + nodeLocation = p.hc.NodeLocation() + nodeCtx = p.hc.NodeCtx() + blockNumber = block.Number(nodeCtx) + parentHash = block.ParentHash(nodeCtx) + allLogs []*types.Log + gp = new(types.GasPool).AddGas(block.GasLimit()) + numTxsProcessed = big.NewInt(0) + blockMinFee, blockMaxFee, blockAvgFee *big.Int ) start := time.Now() parent := p.hc.GetBlock(block.ParentHash(nodeCtx), block.NumberU64(nodeCtx)-1) @@ -392,6 +396,8 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty } } + calcQiTxStats(blockMinFee, blockMaxFee, blockAvgFee, qiTxFee, numTxsProcessed) + totalEtxCoinbaseTime += time.Since(startEtxCoinbase) totalQiTime += time.Since(qiTimeBefore) totalQiProcessTimes["Sanity Checks"] += timing["Sanity Checks"] @@ -768,6 +774,8 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty } time5 := common.PrettyDuration(time.Since(start)) + calcRollingFeeInfo(p.minFee, p.maxFee, p.avgFee, p.numElements, blockMinFee, blockMaxFee, blockAvgFee, numTxsProcessed) + p.logger.WithFields(log.Fields{ "signing time": common.PrettyDuration(timeSign), "prepare state time": common.PrettyDuration(timePrepare), @@ -1867,6 +1875,70 @@ func (p *StateProcessor) StateAtTransaction(block *types.WorkObject, txIndex int return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash()) } +func calcQiTxStats(blockMinFee, blockMaxFee, blockTotalFees, qiTxFee, numTxsProcessed *big.Int) (newBlockMinFee, newBlockMaxFee, newBlockAvgFee *big.Int) { + + if numTxsProcessed.Cmp(common.Big0) == 0 { + numTxsProcessed.Add(numTxsProcessed, common.Big1) + blockMinFee = new(big.Int).Set(qiTxFee) + blockMaxFee = new(big.Int).Set(qiTxFee) + blockTotalFees = new(big.Int).Set(qiTxFee) + return blockMinFee, blockMaxFee, blockTotalFees + } + + numTxsProcessed = numTxsProcessed.Add(numTxsProcessed, common.Big1) + blockMinFee = bigMath.BigMin(qiTxFee, blockMinFee) + blockMaxFee = bigMath.BigMax(qiTxFee, blockMaxFee) + blockTotalFees.Add(blockTotalFees, qiTxFee) + + return blockMinFee, blockMaxFee, blockTotalFees +} + +func calcRollingFeeInfo(rollingMinFee, rollingMaxFee, rollingAvgFee, rollingNumElements, blockMinFee, blockMaxFee, blockTotalFees, numTxsProcessed *big.Int) (min, max, avg, num *big.Int) { + + // Implement peak/envelope filter + if numTxsProcessed.Cmp(common.Big0) == 0 { + // Block values will be nil, so don't compare or update. + return rollingMinFee, rollingMaxFee, rollingAvgFee, rollingNumElements + } + if rollingMinFee == nil || blockMinFee.Cmp(rollingMinFee) < 0 { + // If the new minimum is less than the old minimum, overwrite it. + rollingMinFee = new(big.Int).Set(blockMinFee) + } else { + // If not, increase the old minimum by 1%. + rollingMinFee.Mul(rollingMinFee, common.Big101) + rollingMinFee.Div(rollingMinFee, common.Big100) + } + + if rollingMaxFee == nil || blockMaxFee.Cmp(rollingMaxFee) > 0 { + rollingMaxFee = new(big.Int).Set(blockMaxFee) + } else { + // Decay the max fee by 1%. + rollingMaxFee.Mul(rollingMaxFee, common.Big99) + rollingMaxFee.Div(rollingMaxFee, common.Big100) + } + + // Implement running average + if rollingAvgFee == nil { + rollingAvgFee = big.NewInt(1) + rollingNumElements = big.NewInt(0) + } + + if numTxsProcessed.Cmp(common.Big0) > 0 { + blockAvgFee := blockTotalFees.Div(blockTotalFees, numTxsProcessed) + intermediateVal := new(big.Int).Mul(rollingNumElements, rollingAvgFee) + intermediateVal = intermediateVal.Add(intermediateVal, blockAvgFee) + + rollingNumElements.Add(rollingNumElements, common.Big1) + rollingAvgFee = intermediateVal.Div(intermediateVal, rollingNumElements) + } + + return rollingMinFee, rollingMaxFee, rollingAvgFee, rollingNumElements +} + +func (p *StateProcessor) GetRollingFeeInfo() (min, max, avg *big.Int) { + return p.minFee, p.maxFee, p.avgFee +} + func (p *StateProcessor) Stop() { // Ensure all live cached entries be saved into disk, so that we can skip // cache warmup when node restarts. diff --git a/core/state_processor_test.go b/core/state_processor_test.go new file mode 100644 index 0000000000..7b04a9368a --- /dev/null +++ b/core/state_processor_test.go @@ -0,0 +1,88 @@ +package core + +import ( + "math/big" + "math/rand" + "testing" + + "github.com/stretchr/testify/require" +) + +func generateRandomBlockFees(min, max, numBlocks, maxNumTxs int) (simpleBlockFees [][]int, bigBlockFees [][]*big.Int) { + // Generate the number of txs in this block first. + simpleBlockFees = make([][]int, numBlocks) + bigBlockFees = make([][]*big.Int, numBlocks) + for blockNum := 0; blockNum < numBlocks; blockNum++ { + txLen := rand.Intn(maxNumTxs) + + simpleBlockFees[blockNum] = make([]int, txLen) + bigBlockFees[blockNum] = make([]*big.Int, txLen) + + for i := 0; i < txLen; i++ { + randNum := rand.Intn(max-min+1) + min + simpleBlockFees[blockNum][i] = randNum + bigBlockFees[blockNum][i] = big.NewInt(int64(randNum)) + } + } + + return simpleBlockFees, bigBlockFees +} + +func TestMultiBlockTxStats(t *testing.T) { + simpleBlockFees, bigBlockFees := generateRandomBlockFees(0, 10000, 10000, 1000) + + var expectedRollingMin, expectedRollingMax, expectedRollingAvg int + for blockNum, blockFees := range simpleBlockFees { + var blockMin, blockMax, sum int + for txCount, fee := range blockFees { + if txCount == 0 { + blockMin = fee + blockMax = fee + sum = 0 + } else { + if fee < blockMin { + blockMin = fee + } + + if fee > blockMax { + blockMax = fee + } + } + sum += fee + } + + if len(blockFees) > 0 { + if blockMin < expectedRollingMin || blockNum == 0 { + expectedRollingMin = blockMin + } else { + expectedRollingMin = expectedRollingMin * 101 / 100 + } + + if blockMax > expectedRollingMax || blockNum == 0 { + expectedRollingMax = blockMax + } else { + expectedRollingMax = expectedRollingMax * 99 / 100 + } + // Calculate a running average + expectedRollingAvg = (blockNum*expectedRollingAvg + (sum / len(blockFees))) / (blockNum + 1) + } + } + + var blockMin, blockMax, blockTotal *big.Int + var actualRollingMin, actualRollingMax, rollingAvg, rollingNumElements *big.Int + for _, blockFees := range bigBlockFees { + bigTxsProcessed := big.NewInt(0) + for _, fee := range blockFees { + blockMin, blockMax, blockTotal = calcQiTxStats(blockMin, blockMax, blockTotal, fee, bigTxsProcessed) + } + + actualRollingMin, actualRollingMax, rollingAvg, rollingNumElements = calcRollingFeeInfo(actualRollingMin, actualRollingMax, rollingAvg, rollingNumElements, blockMin, blockMax, blockTotal, bigTxsProcessed) + } + + if expectedRollingMin != 0 || expectedRollingMax != 0 || expectedRollingAvg != 0 { + // If any one of these values is non-zero, then there were fees, so compare. + require.Equal(t, uint64(expectedRollingMin), actualRollingMin.Uint64(), "Expected min not equal") + require.Equal(t, uint64(expectedRollingMax), actualRollingMax.Uint64(), "Expected max not equal") + require.Equal(t, uint64(expectedRollingAvg), rollingAvg.Uint64(), "Expected average not equal") + } +} diff --git a/core/types/transaction.go b/core/types/transaction.go index ed93ae672a..c15fda8831 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -60,7 +60,7 @@ const ( c_MaxTxForSorting = 1500 ) -// Transaction is a Quai transaction. +// Transaction can be a Quai, Qi, or External transaction. type Transaction struct { inner TxData // Consensus contents of a transaction time time.Time // Time first seen locally (spam avoidance) diff --git a/internal/quaiapi/api.go b/internal/quaiapi/api.go index 45f507cf67..f3f1a8b9e7 100644 --- a/internal/quaiapi/api.go +++ b/internal/quaiapi/api.go @@ -169,6 +169,15 @@ func (s *PublicTxPoolAPI) Inspect() map[string]map[string]map[string]string { return content } +func (s *PublicTxPoolAPI) GetRollingFeeInfo() (min, max, avg *hexutil.Big) { + bigMin, bigMax, bigAvg := s.b.GetRollingFeeInfo() + min = (*hexutil.Big)(bigMin) + max = (*hexutil.Big)(bigMax) + avg = (*hexutil.Big)(bigAvg) + + return min, max, avg +} + // PublicBlockChainAPI provides an API to access the Quai blockchain. // It offers only methods that operate on public data that is freely available to anyone. type PublicBlockChainAPI struct { diff --git a/internal/quaiapi/backend.go b/internal/quaiapi/backend.go index 3c4be44f3e..59334320db 100644 --- a/internal/quaiapi/backend.go +++ b/internal/quaiapi/backend.go @@ -139,6 +139,7 @@ type Backend interface { GetMinGasPrice() *big.Int GetPoolGasPrice() *big.Int SendTxToSharingClients(tx *types.Transaction) + GetRollingFeeInfo() (min, max, avg *big.Int) // Filter API BloomStatus() (uint64, uint64) diff --git a/quai/api_backend.go b/quai/api_backend.go index 6041cc7a1d..80e0c7bac0 100644 --- a/quai/api_backend.go +++ b/quai/api_backend.go @@ -406,6 +406,10 @@ func (b *QuaiAPIBackend) SendTxToSharingClients(tx *types.Transaction) { b.quai.core.SendTxToSharingClients(tx) } +func (b *QuaiAPIBackend) GetRollingFeeInfo() (min, max, avg *big.Int) { + return b.quai.core.GetRollingFeeInfo() +} + func (b *QuaiAPIBackend) GetMinGasPrice() *big.Int { return b.quai.core.GetMinGasPrice() }