Skip to content

Commit

Permalink
Collect/peak filter rolling tx fee stats
Browse files Browse the repository at this point in the history
  • Loading branch information
Djadih committed Dec 17, 2024
1 parent a0fc38f commit 001d051
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 25 deletions.
3 changes: 3 additions & 0 deletions common/big.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ var (
Big3 = big.NewInt(3)
Big8 = big.NewInt(8)
Big32 = big.NewInt(32)
Big99 = big.NewInt(99)
Big100 = big.NewInt(100)
Big101 = big.NewInt(101)
Big256 = big.NewInt(256)
Big257 = big.NewInt(257)
Big2e256 = new(big.Int).Exp(big.NewInt(2), big.NewInt(256), big.NewInt(0))
Expand Down
4 changes: 4 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1365,6 +1365,10 @@ func (c *Core) SendTxToSharingClients(tx *types.Transaction) {
c.sl.txPool.SendTxToSharingClients(tx)
}

func (c *Core) GetRollingFeeInfo() (min, max, avg *big.Int) {
return c.Processor().GetRollingFeeInfo()
}

func (c *Core) SuggestFinalityDepth(qiValue *big.Int, correlatedRisk *big.Int) *big.Int {
qiRewardPerBlock := misc.CalculateQiReward(c.CurrentHeader().WorkObjectHeader())

Expand Down
120 changes: 96 additions & 24 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
lru "github.com/hashicorp/golang-lru/v2"

"github.com/dominant-strategies/go-quai/common"
bigMath "github.com/dominant-strategies/go-quai/common/math"
"github.com/dominant-strategies/go-quai/common/prque"
"github.com/dominant-strategies/go-quai/consensus"
"github.com/dominant-strategies/go-quai/consensus/misc"
Expand Down Expand Up @@ -107,19 +108,20 @@ var defaultCacheConfig = &CacheConfig{
//
// StateProcessor implements Processor.
type StateProcessor struct {
config *params.ChainConfig // Chain configuration options
hc *HeaderChain // Canonical block chain
engine consensus.Engine // Consensus engine used for block rewards
logsFeed event.Feed
rmLogsFeed event.Feed
cacheConfig *CacheConfig // CacheConfig for StateProcessor
stateCache state.Database // State database to reuse between imports (contains state cache)
etxCache state.Database // ETX database to reuse between imports (contains ETX cache)
receiptsCache *lru.Cache[common.Hash, types.Receipts] // Cache for the most recent receipts per block
txLookupCache *lru.Cache[common.Hash, rawdb.LegacyTxLookupEntry]
validator Validator // Block and state validator interface
prefetcher Prefetcher
vmConfig vm.Config
config *params.ChainConfig // Chain configuration options
hc *HeaderChain // Canonical block chain
engine consensus.Engine // Consensus engine used for block rewards
logsFeed event.Feed
rmLogsFeed event.Feed
cacheConfig *CacheConfig // CacheConfig for StateProcessor
stateCache state.Database // State database to reuse between imports (contains state cache)
etxCache state.Database // ETX database to reuse between imports (contains ETX cache)
receiptsCache *lru.Cache[common.Hash, types.Receipts] // Cache for the most recent receipts per block
txLookupCache *lru.Cache[common.Hash, rawdb.LegacyTxLookupEntry]
validator Validator // Block and state validator interface
prefetcher Prefetcher
vmConfig vm.Config
minFee, maxFee, avgFee, numElements *big.Int

scope event.SubscriptionScope
wg sync.WaitGroup // chain processing wait group for shutting down
Expand Down Expand Up @@ -221,17 +223,19 @@ type UtxosCreatedDeleted struct {
// transactions failed to execute due to insufficient gas it will return an error.
func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (types.Receipts, []*types.Transaction, []*types.Log, *state.StateDB, uint64, uint64, uint64, *multiset.MultiSet, []common.Unlock, error) {
var (
receipts types.Receipts
usedGas = new(uint64)
usedState = new(uint64)
header = types.CopyWorkObject(block)
blockHash = block.Hash()
nodeLocation = p.hc.NodeLocation()
nodeCtx = p.hc.NodeCtx()
blockNumber = block.Number(nodeCtx)
parentHash = block.ParentHash(nodeCtx)
allLogs []*types.Log
gp = new(types.GasPool).AddGas(block.GasLimit())
receipts types.Receipts
usedGas = new(uint64)
usedState = new(uint64)
header = types.CopyWorkObject(block)
blockHash = block.Hash()
nodeLocation = p.hc.NodeLocation()
nodeCtx = p.hc.NodeCtx()
blockNumber = block.Number(nodeCtx)
parentHash = block.ParentHash(nodeCtx)
allLogs []*types.Log
gp = new(types.GasPool).AddGas(block.GasLimit())
numTxsProcessed = big.NewInt(0)
blockMinFee, blockMaxFee, blockAvgFee *big.Int
)
start := time.Now()
parent := p.hc.GetBlock(block.ParentHash(nodeCtx), block.NumberU64(nodeCtx)-1)
Expand Down Expand Up @@ -392,6 +396,8 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty
}
}

calcQiTxStats(blockMinFee, blockMaxFee, blockAvgFee, qiTxFee, numTxsProcessed)

totalEtxCoinbaseTime += time.Since(startEtxCoinbase)
totalQiTime += time.Since(qiTimeBefore)
totalQiProcessTimes["Sanity Checks"] += timing["Sanity Checks"]
Expand Down Expand Up @@ -768,6 +774,8 @@ func (p *StateProcessor) Process(block *types.WorkObject, batch ethdb.Batch) (ty
}
time5 := common.PrettyDuration(time.Since(start))

calcRollingFeeInfo(p.minFee, p.maxFee, p.avgFee, p.numElements, blockMinFee, blockMaxFee, blockAvgFee, numTxsProcessed)

p.logger.WithFields(log.Fields{
"signing time": common.PrettyDuration(timeSign),
"prepare state time": common.PrettyDuration(timePrepare),
Expand Down Expand Up @@ -1867,6 +1875,70 @@ func (p *StateProcessor) StateAtTransaction(block *types.WorkObject, txIndex int
return nil, vm.BlockContext{}, nil, fmt.Errorf("transaction index %d out of range for block %#x", txIndex, block.Hash())
}

func calcQiTxStats(blockMinFee, blockMaxFee, blockTotalFees, qiTxFee, numTxsProcessed *big.Int) (newBlockMinFee, newBlockMaxFee, newBlockAvgFee *big.Int) {

if numTxsProcessed.Cmp(common.Big0) == 0 {
numTxsProcessed.Add(numTxsProcessed, common.Big1)
blockMinFee = new(big.Int).Set(qiTxFee)
blockMaxFee = new(big.Int).Set(qiTxFee)
blockTotalFees = new(big.Int).Set(qiTxFee)
return blockMinFee, blockMaxFee, blockTotalFees
}

numTxsProcessed = numTxsProcessed.Add(numTxsProcessed, common.Big1)
blockMinFee = bigMath.BigMin(qiTxFee, blockMinFee)
blockMaxFee = bigMath.BigMax(qiTxFee, blockMaxFee)
blockTotalFees.Add(blockTotalFees, qiTxFee)

return blockMinFee, blockMaxFee, blockTotalFees
}

func calcRollingFeeInfo(rollingMinFee, rollingMaxFee, rollingAvgFee, rollingNumElements, blockMinFee, blockMaxFee, blockTotalFees, numTxsProcessed *big.Int) (min, max, avg, num *big.Int) {

// Implement peak/envelope filter
if numTxsProcessed.Cmp(common.Big0) == 0 {
// Block values will be nil, so don't compare or update.
return rollingMinFee, rollingMaxFee, rollingAvgFee, rollingNumElements
}
if rollingMinFee == nil || blockMinFee.Cmp(rollingMinFee) < 0 {
// If the new minimum is less than the old minimum, overwrite it.
rollingMinFee = new(big.Int).Set(blockMinFee)
} else {
// If not, increase the old minimum by 1%.
rollingMinFee.Mul(rollingMinFee, common.Big101)
rollingMinFee.Div(rollingMinFee, common.Big100)
}

if rollingMaxFee == nil || blockMaxFee.Cmp(rollingMaxFee) > 0 {
rollingMaxFee = new(big.Int).Set(blockMaxFee)
} else {
// Decay the max fee by 1%.
rollingMaxFee.Mul(rollingMaxFee, common.Big99)
rollingMaxFee.Div(rollingMaxFee, common.Big100)
}

// Implement running average
if rollingAvgFee == nil {
rollingAvgFee = big.NewInt(1)
rollingNumElements = big.NewInt(0)
}

if numTxsProcessed.Cmp(common.Big0) > 0 {
blockAvgFee := blockTotalFees.Div(blockTotalFees, numTxsProcessed)
intermediateVal := new(big.Int).Mul(rollingNumElements, rollingAvgFee)
intermediateVal = intermediateVal.Add(intermediateVal, blockAvgFee)

rollingNumElements.Add(rollingNumElements, common.Big1)
rollingAvgFee = intermediateVal.Div(intermediateVal, rollingNumElements)
}

return rollingMinFee, rollingMaxFee, rollingAvgFee, rollingNumElements
}

func (p *StateProcessor) GetRollingFeeInfo() (min, max, avg *big.Int) {
return p.minFee, p.maxFee, p.avgFee
}

func (p *StateProcessor) Stop() {
// Ensure all live cached entries be saved into disk, so that we can skip
// cache warmup when node restarts.
Expand Down
88 changes: 88 additions & 0 deletions core/state_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package core

import (
"math/big"
"math/rand"
"testing"

"github.com/stretchr/testify/require"
)

func generateRandomBlockFees(min, max, numBlocks, maxNumTxs int) (simpleBlockFees [][]int, bigBlockFees [][]*big.Int) {
// Generate the number of txs in this block first.
simpleBlockFees = make([][]int, numBlocks)
bigBlockFees = make([][]*big.Int, numBlocks)
for blockNum := 0; blockNum < numBlocks; blockNum++ {
txLen := rand.Intn(maxNumTxs)

simpleBlockFees[blockNum] = make([]int, txLen)
bigBlockFees[blockNum] = make([]*big.Int, txLen)

for i := 0; i < txLen; i++ {
randNum := rand.Intn(max-min+1) + min
simpleBlockFees[blockNum][i] = randNum
bigBlockFees[blockNum][i] = big.NewInt(int64(randNum))
}
}

return simpleBlockFees, bigBlockFees
}

func TestMultiBlockTxStats(t *testing.T) {
simpleBlockFees, bigBlockFees := generateRandomBlockFees(0, 10000, 10000, 1000)

var expectedRollingMin, expectedRollingMax, expectedRollingAvg int
for blockNum, blockFees := range simpleBlockFees {
var blockMin, blockMax, sum int
for txCount, fee := range blockFees {
if txCount == 0 {
blockMin = fee
blockMax = fee
sum = 0
} else {
if fee < blockMin {
blockMin = fee
}

if fee > blockMax {
blockMax = fee
}
}
sum += fee
}

if len(blockFees) > 0 {
if blockMin < expectedRollingMin || blockNum == 0 {
expectedRollingMin = blockMin
} else {
expectedRollingMin = expectedRollingMin * 101 / 100
}

if blockMax > expectedRollingMax || blockNum == 0 {
expectedRollingMax = blockMax
} else {
expectedRollingMax = expectedRollingMax * 99 / 100
}
// Calculate a running average
expectedRollingAvg = (blockNum*expectedRollingAvg + (sum / len(blockFees))) / (blockNum + 1)
}
}

var blockMin, blockMax, blockTotal *big.Int
var actualRollingMin, actualRollingMax, rollingAvg, rollingNumElements *big.Int
for _, blockFees := range bigBlockFees {
bigTxsProcessed := big.NewInt(0)
for _, fee := range blockFees {
blockMin, blockMax, blockTotal = calcQiTxStats(blockMin, blockMax, blockTotal, fee, bigTxsProcessed)
}

actualRollingMin, actualRollingMax, rollingAvg, rollingNumElements = calcRollingFeeInfo(actualRollingMin, actualRollingMax, rollingAvg, rollingNumElements, blockMin, blockMax, blockTotal, bigTxsProcessed)
}

if expectedRollingMin != 0 || expectedRollingMax != 0 || expectedRollingAvg != 0 {
// If any one of these values is non-zero, then there were fees, so compare.
require.Equal(t, uint64(expectedRollingMin), actualRollingMin.Uint64(), "Expected min not equal")
require.Equal(t, uint64(expectedRollingMax), actualRollingMax.Uint64(), "Expected max not equal")
require.Equal(t, uint64(expectedRollingAvg), rollingAvg.Uint64(), "Expected average not equal")
}
}
2 changes: 1 addition & 1 deletion core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (
c_MaxTxForSorting = 1500
)

// Transaction is a Quai transaction.
// Transaction can be a Quai, Qi, or External transaction.
type Transaction struct {
inner TxData // Consensus contents of a transaction
time time.Time // Time first seen locally (spam avoidance)
Expand Down
9 changes: 9 additions & 0 deletions internal/quaiapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,15 @@ func (s *PublicTxPoolAPI) Inspect() map[string]map[string]map[string]string {
return content
}

func (s *PublicTxPoolAPI) GetRollingFeeInfo() (min, max, avg *hexutil.Big) {
bigMin, bigMax, bigAvg := s.b.GetRollingFeeInfo()
min = (*hexutil.Big)(bigMin)
max = (*hexutil.Big)(bigMax)
avg = (*hexutil.Big)(bigAvg)

return min, max, avg
}

// PublicBlockChainAPI provides an API to access the Quai blockchain.
// It offers only methods that operate on public data that is freely available to anyone.
type PublicBlockChainAPI struct {
Expand Down
1 change: 1 addition & 0 deletions internal/quaiapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ type Backend interface {
GetMinGasPrice() *big.Int
GetPoolGasPrice() *big.Int
SendTxToSharingClients(tx *types.Transaction)
GetRollingFeeInfo() (min, max, avg *big.Int)

// Filter API
BloomStatus() (uint64, uint64)
Expand Down
4 changes: 4 additions & 0 deletions quai/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ func (b *QuaiAPIBackend) SendTxToSharingClients(tx *types.Transaction) {
b.quai.core.SendTxToSharingClients(tx)
}

func (b *QuaiAPIBackend) GetRollingFeeInfo() (min, max, avg *big.Int) {
return b.quai.core.GetRollingFeeInfo()
}

func (b *QuaiAPIBackend) GetMinGasPrice() *big.Int {
return b.quai.core.GetMinGasPrice()
}
Expand Down

0 comments on commit 001d051

Please sign in to comment.