Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

separate limit for pending tx #202

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,16 @@ type MempoolConfig struct {
// blacklist the peer.
CheckTxErrorBlacklistEnabled bool `mapstructure:"check-tx-error-blacklist-enabled"`
CheckTxErrorThreshold int `mapstructure:"check-tx-error-threshold"`

// Maximum number of transactions in the pending set
PendingSize int `mapstructure:"pending-size"`

// Limit the total size of all txs in the pending set.
MaxPendingTxsBytes int64 `mapstructure:"max-pending-txs-bytes"`

PendingTTLDuration time.Duration `mapstructure:"pending-ttl-duration"`

PendingTTLNumBlocks int64 `mapstructure:"pending-ttl-num-blocks"`
}

// DefaultMempoolConfig returns a default configuration for the Tendermint mempool.
Expand All @@ -816,6 +826,10 @@ func DefaultMempoolConfig() *MempoolConfig {
TxNotifyThreshold: 0,
CheckTxErrorBlacklistEnabled: false,
CheckTxErrorThreshold: 0,
PendingSize: 5000,
MaxPendingTxsBytes: 1024 * 1024 * 1024, // 1GB
PendingTTLDuration: 0 * time.Second,
PendingTTLNumBlocks: 0,
}
}

Expand Down
8 changes: 8 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,14 @@ check-tx-error-blacklist-enabled = {{ .Mempool.CheckTxErrorBlacklistEnabled }}

check-tx-error-threshold = {{ .Mempool.CheckTxErrorThreshold }}

pending-size = {{ .Mempool.PendingSize }}

max-pending-txs-bytes = {{ .Mempool.MaxPendingTxsBytes }}

pending-ttl-duration = {{ .Mempool.PendingTTLDuration }}

pending-ttl-num-blocks = {{ .Mempool.PendingTTLNumBlocks }}

#######################################################
### State Sync Configuration Options ###
#######################################################
Expand Down
63 changes: 49 additions & 14 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
// sizeBytes defines the total size of the mempool (sum of all tx bytes)
sizeBytes int64

// pendingSizeBytes defines the total size of the pending set (sum of all tx bytes)
pendingSizeBytes int64

// cache defines a fixed-size cache of already seen transactions as this
// reduces pressure on the proxyApp.
cache TxCache
Expand Down Expand Up @@ -177,9 +180,11 @@
// Size returns the number of valid transactions in the mempool. It is
// thread-safe.
func (txmp *TxMempool) Size() int {
txSize := txmp.txStore.Size()
pendingSize := txmp.pendingTxs.Size()
return txSize + pendingSize
return txmp.SizeWithoutPending() + txmp.PendingSize()
}

func (txmp *TxMempool) SizeWithoutPending() int {
return txmp.txStore.Size()
}

// PendingSize returns the number of pending transactions in the mempool.
Expand All @@ -193,6 +198,10 @@
return atomic.LoadInt64(&txmp.sizeBytes)
}

func (txmp *TxMempool) PendingSizeBytes() int64 {
return atomic.LoadInt64(&txmp.pendingSizeBytes)

Check warning on line 202 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L201-L202

Added lines #L201 - L202 were not covered by tests
}

// FlushAppConn executes FlushSync on the mempool's proxyAppConn.
//
// NOTE: The caller must obtain a write-lock prior to execution.
Expand Down Expand Up @@ -326,6 +335,11 @@
if res.Checker == nil {
return errors.New("no checker available for pending transaction")
}
if err := txmp.canAddPendingTx(wtx); err != nil {
// TODO: eviction strategy for pending transactions
return err
}
atomic.AddInt64(&txmp.pendingSizeBytes, int64(wtx.Size()))

Check warning on line 342 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L338-L342

Added lines #L338 - L342 were not covered by tests
txmp.pendingTxs.Insert(wtx, res, txInfo)
}
}
Expand Down Expand Up @@ -410,7 +424,7 @@
)

var txs []types.Tx
if uint64(txmp.Size()) < txmp.config.TxNotifyThreshold {
if uint64(txmp.SizeWithoutPending()) < txmp.config.TxNotifyThreshold {
// do not reap anything if threshold is not met
return txs
}
Expand Down Expand Up @@ -522,7 +536,7 @@
}
}

txmp.metrics.Size.Set(float64(txmp.Size()))
txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending()))
txmp.metrics.PendingSize.Set(float64(txmp.PendingSize()))
return nil
}
Expand Down Expand Up @@ -640,7 +654,7 @@
}

txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size()))
txmp.metrics.Size.Set(float64(txmp.Size()))
txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending()))
txmp.metrics.PendingSize.Set(float64(txmp.PendingSize()))

if txmp.insertTx(wtx) {
Expand All @@ -649,7 +663,7 @@
"priority", wtx.priority,
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"height", txmp.height,
"num_txs", txmp.Size(),
"num_txs", txmp.SizeWithoutPending(),
)
txmp.notifyTxsAvailable()
}
Expand Down Expand Up @@ -745,12 +759,12 @@
if txmp.recheckCursor == nil {
txmp.logger.Debug("finished rechecking transactions")

if txmp.Size() > 0 {
if txmp.SizeWithoutPending() > 0 {
txmp.notifyTxsAvailable()
}
}

txmp.metrics.Size.Set(float64(txmp.Size()))
txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending()))
txmp.metrics.PendingSize.Set(float64(txmp.PendingSize()))
}

Expand Down Expand Up @@ -803,7 +817,7 @@
// the transaction can be inserted into the mempool.
func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error {
var (
numTxs = txmp.Size()
numTxs = txmp.SizeWithoutPending()
sizeBytes = txmp.SizeBytes()
)

Expand All @@ -819,6 +833,24 @@
return nil
}

func (txmp *TxMempool) canAddPendingTx(wtx *WrappedTx) error {
var (
numTxs = txmp.PendingSize()
sizeBytes = txmp.PendingSizeBytes()
)

if numTxs >= txmp.config.PendingSize || int64(wtx.Size())+sizeBytes > txmp.config.MaxPendingTxsBytes {
return types.ErrMempoolPendingIsFull{
NumTxs: numTxs,
MaxTxs: txmp.config.PendingSize,
TxsBytes: sizeBytes,
MaxTxsBytes: txmp.config.MaxPendingTxsBytes,
}
}

Check warning on line 849 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L836-L849

Added lines #L836 - L849 were not covered by tests

return nil

Check warning on line 851 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L851

Added line #L851 was not covered by tests
}

func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool {
if txmp.isInMempool(wtx.tx) {
return false
Expand Down Expand Up @@ -935,13 +967,14 @@
}

// remove pending txs that have expired
txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, func(wtx *WrappedTx) {
txmp.pendingTxs.PurgeExpired(txmp.config.PendingTTLNumBlocks, blockHeight, txmp.config.PendingTTLDuration, now, func(wtx *WrappedTx) {
atomic.AddInt64(&txmp.pendingSizeBytes, int64(-wtx.Size()))

Check warning on line 971 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L971

Added line #L971 was not covered by tests
txmp.expire(blockHeight, wtx)
})
}

func (txmp *TxMempool) notifyTxsAvailable() {
if txmp.Size() == 0 {
if txmp.SizeWithoutPending() == 0 {
return
}

Expand Down Expand Up @@ -980,12 +1013,14 @@
func (txmp *TxMempool) handlePendingTransactions() {
accepted, rejected := txmp.pendingTxs.EvaluatePendingTransactions()
for _, tx := range accepted {
atomic.AddInt64(&txmp.pendingSizeBytes, int64(-tx.tx.Size()))

Check warning on line 1016 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L1016

Added line #L1016 was not covered by tests
if err := txmp.addNewTransaction(tx.tx, tx.checkTxResponse.ResponseCheckTx, tx.txInfo); err != nil {
txmp.logger.Error(fmt.Sprintf("error adding pending transaction: %s", err))
}
}
if !txmp.config.KeepInvalidTxsInCache {
for _, tx := range rejected {
for _, tx := range rejected {
atomic.AddInt64(&txmp.pendingSizeBytes, int64(-tx.tx.Size()))
if !txmp.config.KeepInvalidTxsInCache {

Check warning on line 1023 in internal/mempool/mempool.go

View check run for this annotation

Codecov / codecov/patch

internal/mempool/mempool.go#L1022-L1023

Added lines #L1022 - L1023 were not covered by tests
tx.tx.removeHandler(true)
}
}
Expand Down
20 changes: 20 additions & 0 deletions types/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/sha256"
"errors"
"fmt"

tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
)

Expand Down Expand Up @@ -84,6 +85,25 @@ func (e ErrMempoolIsFull) Error() string {
)
}

// ErrMempoolPendingIsFull defines an error where there are too many pending transactions
// not processed yet
type ErrMempoolPendingIsFull struct {
NumTxs int
MaxTxs int
TxsBytes int64
MaxTxsBytes int64
}

func (e ErrMempoolPendingIsFull) Error() string {
return fmt.Sprintf(
"mempool pending set is full: number of txs %d (max: %d), total txs bytes %d (max: %d)",
e.NumTxs,
e.MaxTxs,
e.TxsBytes,
e.MaxTxsBytes,
)
}

// ErrPreCheck defines an error where a transaction fails a pre-check.
type ErrPreCheck struct {
Reason error
Expand Down
Loading