diff --git a/config/config.go b/config/config.go index d5178869d..2d78698ab 100644 --- a/config/config.go +++ b/config/config.go @@ -799,6 +799,16 @@ type MempoolConfig struct { // blacklist the peer. CheckTxErrorBlacklistEnabled bool `mapstructure:"check-tx-error-blacklist-enabled"` CheckTxErrorThreshold int `mapstructure:"check-tx-error-threshold"` + + // Maximum number of transactions in the pending set + PendingSize int `mapstructure:"pending-size"` + + // Limit the total size of all txs in the pending set. + MaxPendingTxsBytes int64 `mapstructure:"max-pending-txs-bytes"` + + PendingTTLDuration time.Duration `mapstructure:"pending-ttl-duration"` + + PendingTTLNumBlocks int64 `mapstructure:"pending-ttl-num-blocks"` } // DefaultMempoolConfig returns a default configuration for the Tendermint mempool. @@ -816,6 +826,10 @@ func DefaultMempoolConfig() *MempoolConfig { TxNotifyThreshold: 0, CheckTxErrorBlacklistEnabled: false, CheckTxErrorThreshold: 0, + PendingSize: 5000, + MaxPendingTxsBytes: 1024 * 1024 * 1024, // 1GB + PendingTTLDuration: 0 * time.Second, + PendingTTLNumBlocks: 0, } } diff --git a/config/toml.go b/config/toml.go index 4e6c7577d..eee6fe008 100644 --- a/config/toml.go +++ b/config/toml.go @@ -405,6 +405,14 @@ check-tx-error-blacklist-enabled = {{ .Mempool.CheckTxErrorBlacklistEnabled }} check-tx-error-threshold = {{ .Mempool.CheckTxErrorThreshold }} +pending-size = {{ .Mempool.PendingSize }} + +max-pending-txs-bytes = {{ .Mempool.MaxPendingTxsBytes }} + +pending-ttl-duration = {{ .Mempool.PendingTTLDuration }} + +pending-ttl-num-blocks = {{ .Mempool.PendingTTLNumBlocks }} + ####################################################### ### State Sync Configuration Options ### ####################################################### diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index 5e1e558ce..8504c7cd7 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -43,6 +43,9 @@ type TxMempool struct { // sizeBytes defines the total size of the mempool (sum of all tx bytes) sizeBytes int64 + // pendingSizeBytes defines the total size of the pending set (sum of all tx bytes) + pendingSizeBytes int64 + // cache defines a fixed-size cache of already seen transactions as this // reduces pressure on the proxyApp. cache TxCache @@ -177,9 +180,11 @@ func (txmp *TxMempool) Unlock() { // Size returns the number of valid transactions in the mempool. It is // thread-safe. func (txmp *TxMempool) Size() int { - txSize := txmp.txStore.Size() - pendingSize := txmp.pendingTxs.Size() - return txSize + pendingSize + return txmp.SizeWithoutPending() + txmp.PendingSize() +} + +func (txmp *TxMempool) SizeWithoutPending() int { + return txmp.txStore.Size() } // PendingSize returns the number of pending transactions in the mempool. @@ -193,6 +198,10 @@ func (txmp *TxMempool) SizeBytes() int64 { return atomic.LoadInt64(&txmp.sizeBytes) } +func (txmp *TxMempool) PendingSizeBytes() int64 { + return atomic.LoadInt64(&txmp.pendingSizeBytes) +} + // FlushAppConn executes FlushSync on the mempool's proxyAppConn. // // NOTE: The caller must obtain a write-lock prior to execution. @@ -326,6 +335,11 @@ func (txmp *TxMempool) CheckTx( if res.Checker == nil { return errors.New("no checker available for pending transaction") } + if err := txmp.canAddPendingTx(wtx); err != nil { + // TODO: eviction strategy for pending transactions + return err + } + atomic.AddInt64(&txmp.pendingSizeBytes, int64(wtx.Size())) txmp.pendingTxs.Insert(wtx, res, txInfo) } } @@ -410,7 +424,7 @@ func (txmp *TxMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { ) var txs []types.Tx - if uint64(txmp.Size()) < txmp.config.TxNotifyThreshold { + if uint64(txmp.SizeWithoutPending()) < txmp.config.TxNotifyThreshold { // do not reap anything if threshold is not met return txs } @@ -522,7 +536,7 @@ func (txmp *TxMempool) Update( } } - txmp.metrics.Size.Set(float64(txmp.Size())) + txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending())) txmp.metrics.PendingSize.Set(float64(txmp.PendingSize())) return nil } @@ -640,7 +654,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck } txmp.metrics.TxSizeBytes.Observe(float64(wtx.Size())) - txmp.metrics.Size.Set(float64(txmp.Size())) + txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending())) txmp.metrics.PendingSize.Set(float64(txmp.PendingSize())) if txmp.insertTx(wtx) { @@ -649,7 +663,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck "priority", wtx.priority, "tx", fmt.Sprintf("%X", wtx.tx.Hash()), "height", txmp.height, - "num_txs", txmp.Size(), + "num_txs", txmp.SizeWithoutPending(), ) txmp.notifyTxsAvailable() } @@ -745,12 +759,12 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, res *abci.ResponseCheckT if txmp.recheckCursor == nil { txmp.logger.Debug("finished rechecking transactions") - if txmp.Size() > 0 { + if txmp.SizeWithoutPending() > 0 { txmp.notifyTxsAvailable() } } - txmp.metrics.Size.Set(float64(txmp.Size())) + txmp.metrics.Size.Set(float64(txmp.SizeWithoutPending())) txmp.metrics.PendingSize.Set(float64(txmp.PendingSize())) } @@ -803,7 +817,7 @@ func (txmp *TxMempool) updateReCheckTxs(ctx context.Context) { // the transaction can be inserted into the mempool. func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { var ( - numTxs = txmp.Size() + numTxs = txmp.SizeWithoutPending() sizeBytes = txmp.SizeBytes() ) @@ -819,6 +833,24 @@ func (txmp *TxMempool) canAddTx(wtx *WrappedTx) error { return nil } +func (txmp *TxMempool) canAddPendingTx(wtx *WrappedTx) error { + var ( + numTxs = txmp.PendingSize() + sizeBytes = txmp.PendingSizeBytes() + ) + + if numTxs >= txmp.config.PendingSize || int64(wtx.Size())+sizeBytes > txmp.config.MaxPendingTxsBytes { + return types.ErrMempoolPendingIsFull{ + NumTxs: numTxs, + MaxTxs: txmp.config.PendingSize, + TxsBytes: sizeBytes, + MaxTxsBytes: txmp.config.MaxPendingTxsBytes, + } + } + + return nil +} + func (txmp *TxMempool) insertTx(wtx *WrappedTx) bool { if txmp.isInMempool(wtx.tx) { return false @@ -935,13 +967,14 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) { } // remove pending txs that have expired - txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, func(wtx *WrappedTx) { + txmp.pendingTxs.PurgeExpired(txmp.config.PendingTTLNumBlocks, blockHeight, txmp.config.PendingTTLDuration, now, func(wtx *WrappedTx) { + atomic.AddInt64(&txmp.pendingSizeBytes, int64(-wtx.Size())) txmp.expire(blockHeight, wtx) }) } func (txmp *TxMempool) notifyTxsAvailable() { - if txmp.Size() == 0 { + if txmp.SizeWithoutPending() == 0 { return } @@ -980,12 +1013,14 @@ func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string func (txmp *TxMempool) handlePendingTransactions() { accepted, rejected := txmp.pendingTxs.EvaluatePendingTransactions() for _, tx := range accepted { + atomic.AddInt64(&txmp.pendingSizeBytes, int64(-tx.tx.Size())) if err := txmp.addNewTransaction(tx.tx, tx.checkTxResponse.ResponseCheckTx, tx.txInfo); err != nil { txmp.logger.Error(fmt.Sprintf("error adding pending transaction: %s", err)) } } - if !txmp.config.KeepInvalidTxsInCache { - for _, tx := range rejected { + for _, tx := range rejected { + atomic.AddInt64(&txmp.pendingSizeBytes, int64(-tx.tx.Size())) + if !txmp.config.KeepInvalidTxsInCache { tx.tx.removeHandler(true) } } diff --git a/types/mempool.go b/types/mempool.go index 36e877508..a3a0c831e 100644 --- a/types/mempool.go +++ b/types/mempool.go @@ -4,6 +4,7 @@ import ( "crypto/sha256" "errors" "fmt" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" ) @@ -84,6 +85,25 @@ func (e ErrMempoolIsFull) Error() string { ) } +// ErrMempoolPendingIsFull defines an error where there are too many pending transactions +// not processed yet +type ErrMempoolPendingIsFull struct { + NumTxs int + MaxTxs int + TxsBytes int64 + MaxTxsBytes int64 +} + +func (e ErrMempoolPendingIsFull) Error() string { + return fmt.Sprintf( + "mempool pending set is full: number of txs %d (max: %d), total txs bytes %d (max: %d)", + e.NumTxs, + e.MaxTxs, + e.TxsBytes, + e.MaxTxsBytes, + ) +} + // ErrPreCheck defines an error where a transaction fails a pre-check. type ErrPreCheck struct { Reason error