diff --git a/config/config.go b/config/config.go index 360747d503..15dc766220 100644 --- a/config/config.go +++ b/config/config.go @@ -27,9 +27,7 @@ const ( // DefaultLogLevel defines a default log level as INFO. DefaultLogLevel = "info" - // Mempool versions. V1 is prioritized mempool, v0 is regular mempool. - // Default is v0. - MempoolV0 = "v0" + // Mempool versions. v1 (prioritized) is the current default. MempoolV1 = "v1" MempoolV2 = "v2" ) diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 34bfce05ca..4449174162 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -24,7 +24,6 @@ import ( cfg "github.com/cometbft/cometbft/config" mempoolv2 "github.com/cometbft/cometbft/mempool/cat" - mempoolv0 "github.com/cometbft/cometbft/mempool/v0" mempoolv1 "github.com/cometbft/cometbft/mempool/v1" "github.com/cometbft/cometbft/p2p" cmtcons "github.com/cometbft/cometbft/proto/tendermint/consensus" @@ -75,12 +74,6 @@ func TestByzantinePrevoteEquivocation(t *testing.T) { var mempool mempl.Mempool switch thisConfig.Mempool.Version { - case cfg.MempoolV0: - mempool = mempoolv0.NewCListMempool(config.Mempool, - proxyAppConnConMem, - state.LastBlockHeight, - mempoolv0.WithPreCheck(sm.TxPreCheck(state)), - mempoolv0.WithPostCheck(sm.TxPostCheck(state))) case cfg.MempoolV1: mempool = mempoolv1.NewTxMempool(logger, config.Mempool, diff --git a/consensus/common_test.go b/consensus/common_test.go index 6c91378686..a29d937799 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -30,7 +30,6 @@ import ( cmtsync "github.com/cometbft/cometbft/libs/sync" mempl "github.com/cometbft/cometbft/mempool" mempoolv2 "github.com/cometbft/cometbft/mempool/cat" - mempoolv0 "github.com/cometbft/cometbft/mempool/v0" mempoolv1 "github.com/cometbft/cometbft/mempool/v1" "github.com/cometbft/cometbft/p2p" "github.com/cometbft/cometbft/privval" @@ -402,13 +401,6 @@ func newStateWithConfigAndBlockStore( var mempool mempl.Mempool switch config.Mempool.Version { - case cfg.MempoolV0: - mempool = mempoolv0.NewCListMempool(config.Mempool, - proxyAppConnConMem, - state.LastBlockHeight, - mempoolv0.WithMetrics(memplMetrics), - mempoolv0.WithPreCheck(sm.TxPreCheck(state)), - mempoolv0.WithPostCheck(sm.TxPostCheck(state))) case cfg.MempoolV1: logger := consensusLogger() mempool = mempoolv1.NewTxMempool(logger, diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index c411c68ad0..2a0d57b0a5 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -31,7 +31,6 @@ import ( cmtsync "github.com/cometbft/cometbft/libs/sync" mempl "github.com/cometbft/cometbft/mempool" mempoolv2 "github.com/cometbft/cometbft/mempool/cat" - mempoolv0 "github.com/cometbft/cometbft/mempool/v0" mempoolv1 "github.com/cometbft/cometbft/mempool/v1" "github.com/cometbft/cometbft/p2p" p2pmock "github.com/cometbft/cometbft/p2p/mock" @@ -169,13 +168,6 @@ func TestReactorWithEvidence(t *testing.T) { var mempool mempl.Mempool switch config.Mempool.Version { - case cfg.MempoolV0: - mempool = mempoolv0.NewCListMempool(config.Mempool, - proxyAppConnConMem, - state.LastBlockHeight, - mempoolv0.WithMetrics(memplMetrics), - mempoolv0.WithPreCheck(sm.TxPreCheck(state)), - mempoolv0.WithPostCheck(sm.TxPostCheck(state))) case cfg.MempoolV1: mempool = mempoolv1.NewTxMempool(logger, config.Mempool, diff --git a/mempool/v0/bench_test.go b/mempool/v0/bench_test.go deleted file mode 100644 index 145836269a..0000000000 --- a/mempool/v0/bench_test.go +++ /dev/null @@ -1,101 +0,0 @@ -package v0 - -import ( - "encoding/binary" - "sync/atomic" - "testing" - - "github.com/cometbft/cometbft/abci/example/kvstore" - "github.com/cometbft/cometbft/mempool" - "github.com/cometbft/cometbft/proxy" -) - -func BenchmarkReap(b *testing.B) { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - - mp.config.Size = 100000 - - size := 10000 - for i := 0; i < size; i++ { - tx := make([]byte, 8) - binary.BigEndian.PutUint64(tx, uint64(i)) - if err := mp.CheckTx(tx, nil, mempool.TxInfo{}); err != nil { - b.Fatal(err) - } - } - b.ResetTimer() - for i := 0; i < b.N; i++ { - mp.ReapMaxBytesMaxGas(100000000, 10000000) - } -} - -func BenchmarkCheckTx(b *testing.B) { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - - mp.config.Size = 1000000 - - b.ResetTimer() - - for i := 0; i < b.N; i++ { - b.StopTimer() - tx := make([]byte, 8) - binary.BigEndian.PutUint64(tx, uint64(i)) - b.StartTimer() - - if err := mp.CheckTx(tx, nil, mempool.TxInfo{}); err != nil { - b.Fatal(err) - } - } -} - -func BenchmarkParallelCheckTx(b *testing.B) { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - - mp.config.Size = 100000000 - - var txcnt uint64 - next := func() uint64 { - return atomic.AddUint64(&txcnt, 1) - 1 - } - - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - tx := make([]byte, 8) - binary.BigEndian.PutUint64(tx, next()) - if err := mp.CheckTx(tx, nil, mempool.TxInfo{}); err != nil { - b.Fatal(err) - } - } - }) -} - -func BenchmarkCheckDuplicateTx(b *testing.B) { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - - mp.config.Size = 1000000 - - for i := 0; i < b.N; i++ { - tx := make([]byte, 8) - binary.BigEndian.PutUint64(tx, uint64(i)) - if err := mp.CheckTx(tx, nil, mempool.TxInfo{}); err != nil { - b.Fatal(err) - } - - if err := mp.CheckTx(tx, nil, mempool.TxInfo{}); err == nil { - b.Fatal("tx should be duplicate") - } - } -} diff --git a/mempool/v0/cache_test.go b/mempool/v0/cache_test.go deleted file mode 100644 index c25981183d..0000000000 --- a/mempool/v0/cache_test.go +++ /dev/null @@ -1,81 +0,0 @@ -package v0 - -import ( - "crypto/sha256" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/cometbft/cometbft/abci/example/kvstore" - abci "github.com/cometbft/cometbft/abci/types" - "github.com/cometbft/cometbft/mempool" - "github.com/cometbft/cometbft/proxy" - "github.com/cometbft/cometbft/types" -) - -func TestCacheAfterUpdate(t *testing.T) { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - - // reAddIndices & txsInCache can have elements > numTxsToCreate - // also assumes max index is 255 for convenience - // txs in cache also checks order of elements - tests := []struct { - numTxsToCreate int - updateIndices []int - reAddIndices []int - txsInCache []int - }{ - {1, []int{}, []int{1}, []int{1, 0}}, // adding new txs works - {2, []int{1}, []int{}, []int{1, 0}}, // update doesn't remove tx from cache - {2, []int{2}, []int{}, []int{2, 1, 0}}, // update adds new tx to cache - {2, []int{1}, []int{1}, []int{1, 0}}, // re-adding after update doesn't make dupe - } - for tcIndex, tc := range tests { - for i := 0; i < tc.numTxsToCreate; i++ { - tx := types.Tx{byte(i)} - err := mp.CheckTx(tx, nil, mempool.TxInfo{}) - require.NoError(t, err) - } - - updateTxs := []types.Tx{} - for _, v := range tc.updateIndices { - tx := types.Tx{byte(v)} - updateTxs = append(updateTxs, tx) - } - err := mp.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil) - require.NoError(t, err) - - for _, v := range tc.reAddIndices { - tx := types.Tx{byte(v)} - _ = mp.CheckTx(tx, nil, mempool.TxInfo{}) - } - - cache := mp.cache.(*mempool.LRUTxCache) - node := cache.GetList().Front() - counter := 0 - for node != nil { - require.NotEqual(t, len(tc.txsInCache), counter, - "cache larger than expected on testcase %d", tcIndex) - - nodeVal := node.Value.(types.TxKey) - expectedBz := sha256.Sum256([]byte{byte(tc.txsInCache[len(tc.txsInCache)-counter-1])}) - // Reference for reading the errors: - // >>> sha256('\x00').hexdigest() - // '6e340b9cffb37a989ca544e6bb780a2c78901d3fb33738768511a30617afa01d' - // >>> sha256('\x01').hexdigest() - // '4bf5122f344554c53bde2ebb8cd2b7e3d1600ad631c385a5d7cce23c7785459a' - // >>> sha256('\x02').hexdigest() - // 'dbc1b4c900ffe48d575b5da5c638040125f65db0fe3e24494b76ea986457d986' - - require.EqualValues(t, expectedBz, nodeVal, "Equality failed on index %d, tc %d", counter, tcIndex) - counter++ - node = node.Next() - } - require.Equal(t, len(tc.txsInCache), counter, - "cache smaller than expected on testcase %d", tcIndex) - mp.Flush() - } -} diff --git a/mempool/v0/clist_mempool.go b/mempool/v0/clist_mempool.go deleted file mode 100644 index e81083cac8..0000000000 --- a/mempool/v0/clist_mempool.go +++ /dev/null @@ -1,686 +0,0 @@ -package v0 - -import ( - "bytes" - "errors" - "sync" - "sync/atomic" - - abci "github.com/cometbft/cometbft/abci/types" - "github.com/cometbft/cometbft/config" - "github.com/cometbft/cometbft/libs/clist" - "github.com/cometbft/cometbft/libs/log" - cmtmath "github.com/cometbft/cometbft/libs/math" - cmtsync "github.com/cometbft/cometbft/libs/sync" - "github.com/cometbft/cometbft/mempool" - "github.com/cometbft/cometbft/p2p" - "github.com/cometbft/cometbft/proxy" - "github.com/cometbft/cometbft/types" -) - -// CListMempool is an ordered in-memory pool for transactions before they are -// proposed in a consensus round. Transaction validity is checked using the -// CheckTx abci message before the transaction is added to the pool. The -// mempool uses a concurrent list structure for storing transactions that can -// be efficiently accessed by multiple concurrent readers. -type CListMempool struct { - // Atomic integers - height int64 // the last block Update()'d to - txsBytes int64 // total size of mempool, in bytes - - // notify listeners (ie. consensus) when txs are available - notifiedTxsAvailable bool - txsAvailable chan struct{} // fires once for each height, when the mempool is not empty - - config *config.MempoolConfig - - // Exclusive mutex for Update method to prevent concurrent execution of - // CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods. - updateMtx cmtsync.RWMutex - preCheck mempool.PreCheckFunc - postCheck mempool.PostCheckFunc - - txs *clist.CList // concurrent linked-list of good txs - proxyAppConn proxy.AppConnMempool - - // Track whether we're rechecking txs. - // These are not protected by a mutex and are expected to be mutated in - // serial (ie. by abci responses which are called in serial). - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here - - // Map for quick access to txs to record sender in CheckTx. - // txsMap: txKey -> CElement - txsMap sync.Map - - // Keep a cache of already-seen txs. - // This reduces the pressure on the proxyApp. - cache mempool.TxCache - - logger log.Logger - metrics *mempool.Metrics -} - -var _ mempool.Mempool = &CListMempool{} - -// CListMempoolOption sets an optional parameter on the mempool. -type CListMempoolOption func(*CListMempool) - -// NewCListMempool returns a new mempool with the given configuration and -// connection to an application. -func NewCListMempool( - cfg *config.MempoolConfig, - proxyAppConn proxy.AppConnMempool, - height int64, - options ...CListMempoolOption, -) *CListMempool { - - mp := &CListMempool{ - config: cfg, - proxyAppConn: proxyAppConn, - txs: clist.New(), - height: height, - recheckCursor: nil, - recheckEnd: nil, - logger: log.NewNopLogger(), - metrics: mempool.NopMetrics(), - } - - if cfg.CacheSize > 0 { - mp.cache = mempool.NewLRUTxCache(cfg.CacheSize) - } else { - mp.cache = mempool.NopTxCache{} - } - - proxyAppConn.SetResponseCallback(mp.globalCb) - - for _, option := range options { - option(mp) - } - - return mp -} - -// NOTE: not thread safe - should only be called once, on startup -func (mem *CListMempool) EnableTxsAvailable() { - mem.txsAvailable = make(chan struct{}, 1) -} - -// SetLogger sets the Logger. -func (mem *CListMempool) SetLogger(l log.Logger) { - mem.logger = l -} - -// WithPreCheck sets a filter for the mempool to reject a tx if f(tx) returns -// false. This is ran before CheckTx. Only applies to the first created block. -// After that, Update overwrites the existing value. -func WithPreCheck(f mempool.PreCheckFunc) CListMempoolOption { - return func(mem *CListMempool) { mem.preCheck = f } -} - -// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns -// false. This is ran after CheckTx. Only applies to the first created block. -// After that, Update overwrites the existing value. -func WithPostCheck(f mempool.PostCheckFunc) CListMempoolOption { - return func(mem *CListMempool) { mem.postCheck = f } -} - -// WithMetrics sets the metrics. -func WithMetrics(metrics *mempool.Metrics) CListMempoolOption { - return func(mem *CListMempool) { mem.metrics = metrics } -} - -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) Lock() { - mem.updateMtx.Lock() -} - -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) Unlock() { - mem.updateMtx.Unlock() -} - -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) Size() int { - return mem.txs.Len() -} - -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) SizeBytes() int64 { - return atomic.LoadInt64(&mem.txsBytes) -} - -// Lock() must be help by the caller during execution. -func (mem *CListMempool) FlushAppConn() error { - return mem.proxyAppConn.FlushSync() -} - -// XXX: Unsafe! Calling Flush may leave mempool in inconsistent state. -func (mem *CListMempool) Flush() { - mem.updateMtx.RLock() - defer mem.updateMtx.RUnlock() - - _ = atomic.SwapInt64(&mem.txsBytes, 0) - mem.cache.Reset() - - for e := mem.txs.Front(); e != nil; e = e.Next() { - mem.txs.Remove(e) - e.DetachPrev() - } - - mem.txsMap.Range(func(key, _ interface{}) bool { - mem.txsMap.Delete(key) - return true - }) -} - -// TxsFront returns the first transaction in the ordered list for peer -// goroutines to call .NextWait() on. -// FIXME: leaking implementation details! -// -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) TxsFront() *clist.CElement { - return mem.txs.Front() -} - -// TxsWaitChan returns a channel to wait on transactions. It will be closed -// once the mempool is not empty (ie. the internal `mem.txs` has at least one -// element) -// -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) TxsWaitChan() <-chan struct{} { - return mem.txs.WaitChan() -} - -// It blocks if we're waiting on Update() or Reap(). -// cb: A callback from the CheckTx command. -// -// It gets called from another goroutine. -// -// CONTRACT: Either cb will get called, or err returned. -// -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) CheckTx( - tx types.Tx, - cb func(*abci.Response), - txInfo mempool.TxInfo, -) error { - - mem.updateMtx.RLock() - // use defer to unlock mutex because application (*local client*) might panic - defer mem.updateMtx.RUnlock() - - txSize := len(tx) - - if err := mem.isFull(txSize); err != nil { - return err - } - - if txSize > mem.config.MaxTxBytes { - return mempool.ErrTxTooLarge{ - Max: mem.config.MaxTxBytes, - Actual: txSize, - } - } - - if mem.preCheck != nil { - if err := mem.preCheck(tx); err != nil { - return mempool.ErrPreCheck{ - Reason: err, - } - } - } - - // NOTE: proxyAppConn may error if tx buffer is full - if err := mem.proxyAppConn.Error(); err != nil { - return err - } - - if !mem.cache.Push(tx) { // if the transaction already exists in the cache - // Record a new sender for a tx we've already seen. - // Note it's possible a tx is still in the cache but no longer in the mempool - // (eg. after committing a block, txs are removed from mempool but not cache), - // so we only record the sender for txs still in the mempool. - if e, ok := mem.txsMap.Load(tx.Key()); ok { - mem.metrics.AlreadySeenTxs.Add(1) - memTx := e.(*clist.CElement).Value.(*mempoolTx) - memTx.senders.LoadOrStore(txInfo.SenderID, true) - // TODO: consider punishing peer for dups, - // its non-trivial since invalid txs can become valid, - // but they can spam the same tx with little cost to them atm. - } - return mempool.ErrTxInCache - } - - reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx}) - reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb)) - - return nil -} - -// Global callback that will be called after every ABCI response. -// Having a single global callback avoids needing to set a callback for each request. -// However, processing the checkTx response requires the peerID (so we can track which txs we heard from who), -// and peerID is not included in the ABCI request, so we have to set request-specific callbacks that -// include this information. If we're not in the midst of a recheck, this function will just return, -// so the request specific callback can do the work. -// -// When rechecking, we don't need the peerID, so the recheck callback happens -// here. -func (mem *CListMempool) globalCb(req *abci.Request, res *abci.Response) { - if mem.recheckCursor == nil { - return - } - - mem.metrics.RecheckTimes.Add(1) - mem.resCbRecheck(req, res) - - // update metrics - mem.metrics.Size.Set(float64(mem.Size())) - mem.metrics.SizeBytes.Set(float64(mem.SizeBytes())) -} - -// Request specific callback that should be set on individual reqRes objects -// to incorporate local information when processing the response. -// This allows us to track the peer that sent us this tx, so we can avoid sending it back to them. -// NOTE: alternatively, we could include this information in the ABCI request itself. -// -// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called -// when all other response processing is complete. -// -// Used in CheckTx to record PeerID who sent us the tx. -func (mem *CListMempool) reqResCb( - tx []byte, - peerID uint16, - peerP2PID p2p.ID, - externalCb func(*abci.Response), -) func(res *abci.Response) { - return func(res *abci.Response) { - if mem.recheckCursor != nil { - // this should never happen - panic("recheck cursor is not nil in reqResCb") - } - - mem.resCbFirstTime(tx, peerID, peerP2PID, res) - - // update metrics - mem.metrics.Size.Set(float64(mem.Size())) - mem.metrics.SizeBytes.Set(float64(mem.SizeBytes())) - - // passed in by the caller of CheckTx, eg. the RPC - if externalCb != nil { - externalCb(res) - } - } -} - -// Called from: -// - resCbFirstTime (lock not held) if tx is valid -func (mem *CListMempool) addTx(memTx *mempoolTx) { - e := mem.txs.PushBack(memTx) - mem.txsMap.Store(memTx.tx.Key(), e) - atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx))) - mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) -} - -// Called from: -// - Update (lock held) if tx was committed -// - resCbRecheck (lock not held) if tx was invalidated -func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) { - mem.txs.Remove(elem) - elem.DetachPrev() - mem.txsMap.Delete(tx.Key()) - if memtx, ok := elem.Value.(*mempoolTx); ok { - tx = memtx.tx - } - atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) - - if removeFromCache { - mem.cache.Remove(tx) - } -} - -// RemoveTxByKey removes a transaction from the mempool by its TxKey index. -func (mem *CListMempool) RemoveTxByKey(txKey types.TxKey) error { - if e, ok := mem.txsMap.Load(txKey); ok { - memTx := e.(*clist.CElement).Value.(*mempoolTx) - if memTx != nil { - mem.removeTx(memTx.tx, e.(*clist.CElement), false) - return nil - } - return errors.New("transaction not found") - } - return errors.New("invalid transaction found") -} - -func (mem *CListMempool) isFull(txSize int) error { - var ( - memSize = mem.Size() - txsBytes = mem.SizeBytes() - ) - - if memSize >= mem.config.Size || int64(txSize)+txsBytes > mem.config.MaxTxsBytes { - return mempool.ErrMempoolIsFull{ - NumTxs: memSize, - MaxTxs: mem.config.Size, - TxsBytes: txsBytes, - MaxTxsBytes: mem.config.MaxTxsBytes, - } - } - - return nil -} - -// callback, which is called after the app checked the tx for the first time. -// -// The case where the app checks the tx for the second and subsequent times is -// handled by the resCbRecheck callback. -func (mem *CListMempool) resCbFirstTime( - tx []byte, - peerID uint16, - peerP2PID p2p.ID, - res *abci.Response, -) { - switch r := res.Value.(type) { - case *abci.Response_CheckTx: - var postCheckErr error - if mem.postCheck != nil { - postCheckErr = mem.postCheck(tx, r.CheckTx) - } - if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { - // Check mempool isn't full again to reduce the chance of exceeding the - // limits. - if err := mem.isFull(len(tx)); err != nil { - // remove from cache (mempool might have a space later) - mem.cache.Remove(tx) - mem.logger.Error(err.Error()) - return - } - - memTx := &mempoolTx{ - height: mem.height, - gasWanted: r.CheckTx.GasWanted, - tx: tx, - } - memTx.senders.Store(peerID, true) - mem.addTx(memTx) - mem.logger.Debug( - "added good transaction", - "tx", types.Tx(tx).Hash(), - "res", r, - "height", memTx.height, - "total", mem.Size(), - ) - mem.notifyTxsAvailable() - } else { - // ignore bad transaction - mem.logger.Debug( - "rejected bad transaction", - "tx", types.Tx(tx).Hash(), - "peerID", peerP2PID, - "res", r, - "err", postCheckErr, - ) - mem.metrics.FailedTxs.Add(1) - - if !mem.config.KeepInvalidTxsInCache { - // remove from cache (it might be good later) - mem.cache.Remove(tx) - } - } - - default: - // ignore other messages - } -} - -// callback, which is called after the app rechecked the tx. -// -// The case where the app checks the tx for the first time is handled by the -// resCbFirstTime callback. -func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) { - switch r := res.Value.(type) { - case *abci.Response_CheckTx: - tx := req.GetCheckTx().Tx - memTx := mem.recheckCursor.Value.(*mempoolTx) - - // Search through the remaining list of tx to recheck for a transaction that matches - // the one we received from the ABCI application. - for { - if bytes.Equal(tx, memTx.tx) { - // We've found a tx in the recheck list that matches the tx that we - // received from the ABCI application. - // Break, and use this transaction for further checks. - break - } - - mem.logger.Error( - "re-CheckTx transaction mismatch", - "got", types.Tx(tx), - "expected", memTx.tx, - ) - - if mem.recheckCursor == mem.recheckEnd { - // we reached the end of the recheckTx list without finding a tx - // matching the one we received from the ABCI application. - // Return without processing any tx. - mem.recheckCursor = nil - return - } - - mem.recheckCursor = mem.recheckCursor.Next() - memTx = mem.recheckCursor.Value.(*mempoolTx) - } - - var postCheckErr error - if mem.postCheck != nil { - postCheckErr = mem.postCheck(tx, r.CheckTx) - } - - if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil { - // Good, nothing to do. - } else { - // Tx became invalidated due to newly committed block. - mem.logger.Debug("tx is no longer valid", "tx", types.Tx(tx).Hash(), "res", r, "err", postCheckErr) - // NOTE: we remove tx from the cache because it might be good later - mem.removeTx(tx, mem.recheckCursor, !mem.config.KeepInvalidTxsInCache) - } - if mem.recheckCursor == mem.recheckEnd { - mem.recheckCursor = nil - } else { - mem.recheckCursor = mem.recheckCursor.Next() - } - if mem.recheckCursor == nil { - // Done! - mem.logger.Debug("done rechecking txs") - - // in case the recheck removed all txs - if mem.Size() > 0 { - mem.notifyTxsAvailable() - } - } - default: - // ignore other messages - } -} - -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) TxsAvailable() <-chan struct{} { - return mem.txsAvailable -} - -func (mem *CListMempool) notifyTxsAvailable() { - if mem.Size() == 0 { - panic("notified txs available but mempool is empty!") - } - if mem.txsAvailable != nil && !mem.notifiedTxsAvailable { - // channel cap is 1, so this will send once - mem.notifiedTxsAvailable = true - select { - case mem.txsAvailable <- struct{}{}: - default: - } - } -} - -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs { - mem.updateMtx.RLock() - defer mem.updateMtx.RUnlock() - - var ( - totalGas int64 - runningSize int64 - ) - - // TODO: we will get a performance boost if we have a good estimate of avg - // size per tx, and set the initial capacity based off of that. - // txs := make([]types.Tx, 0, cmtmath.MinInt(mem.txs.Len(), max/mem.avgTxSize)) - txs := make([]types.Tx, 0, mem.txs.Len()) - for e := mem.txs.Front(); e != nil; e = e.Next() { - memTx := e.Value.(*mempoolTx) - - txs = append(txs, memTx.tx) - - dataSize := types.ComputeProtoSizeForTxs([]types.Tx{memTx.tx}) - - // Check total size requirement - if maxBytes > -1 && runningSize+dataSize > maxBytes { - return txs[:len(txs)-1] - } - - runningSize += dataSize - - // Check total gas requirement. - // If maxGas is negative, skip this check. - // Since newTotalGas < masGas, which - // must be non-negative, it follows that this won't overflow. - newTotalGas := totalGas + memTx.gasWanted - if maxGas > -1 && newTotalGas > maxGas { - return txs[:len(txs)-1] - } - totalGas = newTotalGas - } - return txs -} - -// Safe for concurrent use by multiple goroutines. -func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { - mem.updateMtx.RLock() - defer mem.updateMtx.RUnlock() - - if max < 0 { - max = mem.txs.Len() - } - - txs := make([]types.Tx, 0, cmtmath.MinInt(mem.txs.Len(), max)) - for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() { - memTx := e.Value.(*mempoolTx) - txs = append(txs, memTx.tx) - } - return txs -} - -// Lock() must be help by the caller during execution. -func (mem *CListMempool) Update( - height int64, - txs types.Txs, - deliverTxResponses []*abci.ResponseDeliverTx, - preCheck mempool.PreCheckFunc, - postCheck mempool.PostCheckFunc, -) error { - // Set height - mem.height = height - mem.notifiedTxsAvailable = false - - if preCheck != nil { - mem.preCheck = preCheck - } - if postCheck != nil { - mem.postCheck = postCheck - } - - mem.metrics.SuccessfulTxs.Add(float64(len(txs))) - for i, tx := range txs { - if deliverTxResponses[i].Code == abci.CodeTypeOK { - // Add valid committed tx to the cache (if missing). - _ = mem.cache.Push(tx) - } else if !mem.config.KeepInvalidTxsInCache { - // Allow invalid transactions to be resubmitted. - mem.cache.Remove(tx) - } - - // Remove committed tx from the mempool. - // - // Note an evil proposer can drop valid txs! - // Mempool before: - // 100 -> 101 -> 102 - // Block, proposed by an evil proposer: - // 101 -> 102 - // Mempool after: - // 100 - // https://github.com/cometbft/cometbft/issues/3322. - if e, ok := mem.txsMap.Load(tx.Key()); ok { - mem.removeTx(tx, e.(*clist.CElement), false) - } - } - - // Either recheck non-committed txs to see if they became invalid - // or just notify there're some txs left. - if mem.Size() > 0 { - if mem.config.Recheck { - mem.logger.Debug("recheck txs", "numtxs", mem.Size(), "height", height) - mem.recheckTxs() - // At this point, mem.txs are being rechecked. - // mem.recheckCursor re-scans mem.txs and possibly removes some txs. - // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. - } else { - mem.notifyTxsAvailable() - } - } - - // Update metrics - mem.metrics.Size.Set(float64(mem.Size())) - mem.metrics.SizeBytes.Set(float64(mem.SizeBytes())) - - return nil -} - -func (mem *CListMempool) recheckTxs() { - if mem.Size() == 0 { - panic("recheckTxs is called, but the mempool is empty") - } - - mem.recheckCursor = mem.txs.Front() - mem.recheckEnd = mem.txs.Back() - - // Push txs to proxyAppConn - // NOTE: globalCb may be called concurrently. - for e := mem.txs.Front(); e != nil; e = e.Next() { - memTx := e.Value.(*mempoolTx) - mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{ - Tx: memTx.tx, - Type: abci.CheckTxType_Recheck, - }) - } - - mem.proxyAppConn.FlushAsync() -} - -//-------------------------------------------------------------------------------- - -// mempoolTx is a transaction that successfully ran -type mempoolTx struct { - height int64 // height that this tx had been validated in - gasWanted int64 // amount of gas this tx states it will require - tx types.Tx // - - // ids of peers who've sent us this tx (as a map for quick lookups). - // senders: PeerID -> bool - senders sync.Map -} - -// Height returns the height for this transaction -func (memTx *mempoolTx) Height() int64 { - return atomic.LoadInt64(&memTx.height) -} diff --git a/mempool/v0/clist_mempool_test.go b/mempool/v0/clist_mempool_test.go deleted file mode 100644 index 0c303714c9..0000000000 --- a/mempool/v0/clist_mempool_test.go +++ /dev/null @@ -1,741 +0,0 @@ -package v0 - -import ( - "bytes" - "crypto/rand" - "encoding/binary" - "fmt" - mrand "math/rand" - "os" - "testing" - "time" - - "github.com/gogo/protobuf/proto" - gogotypes "github.com/gogo/protobuf/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - abciclient "github.com/cometbft/cometbft/abci/client" - abciclimocks "github.com/cometbft/cometbft/abci/client/mocks" - "github.com/cometbft/cometbft/abci/example/kvstore" - abciserver "github.com/cometbft/cometbft/abci/server" - abci "github.com/cometbft/cometbft/abci/types" - "github.com/cometbft/cometbft/config" - "github.com/cometbft/cometbft/libs/log" - cmtrand "github.com/cometbft/cometbft/libs/rand" - "github.com/cometbft/cometbft/libs/service" - "github.com/cometbft/cometbft/mempool" - "github.com/cometbft/cometbft/pkg/consts" - tmproto "github.com/cometbft/cometbft/proto/tendermint/types" - "github.com/cometbft/cometbft/proxy" - "github.com/cometbft/cometbft/types" -) - -// A cleanupFunc cleans up any config / test files created for a particular -// test. -type cleanupFunc func() - -func newMempoolWithAppMock(cc proxy.ClientCreator, client abciclient.Client) (*CListMempool, cleanupFunc, error) { - conf := config.ResetTestRoot("mempool_test") - - mp, cu := newMempoolWithAppAndConfigMock(cc, conf, client) - return mp, cu, nil -} - -func newMempoolWithAppAndConfigMock(cc proxy.ClientCreator, - cfg *config.Config, - client abciclient.Client) (*CListMempool, cleanupFunc) { - appConnMem := client - appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) - err := appConnMem.Start() - if err != nil { - panic(err) - } - - mp := NewCListMempool(cfg.Mempool, appConnMem, 0) - mp.SetLogger(log.TestingLogger()) - - return mp, func() { os.RemoveAll(cfg.RootDir) } -} - -func newMempoolWithApp(cc proxy.ClientCreator) (*CListMempool, cleanupFunc) { - conf := config.ResetTestRoot("mempool_test") - - mp, cu := newMempoolWithAppAndConfig(cc, conf) - return mp, cu -} - -func newMempoolWithAppAndConfig(cc proxy.ClientCreator, cfg *config.Config) (*CListMempool, cleanupFunc) { - appConnMem, _ := cc.NewABCIClient() - appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) - err := appConnMem.Start() - if err != nil { - panic(err) - } - - mp := NewCListMempool(cfg.Mempool, appConnMem, 0) - mp.SetLogger(log.TestingLogger()) - - return mp, func() { os.RemoveAll(cfg.RootDir) } -} - -func ensureNoFire(t *testing.T, ch <-chan struct{}, timeoutMS int) { - timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) - select { - case <-ch: - t.Fatal("Expected not to fire") - case <-timer.C: - } -} - -func ensureFire(t *testing.T, ch <-chan struct{}, timeoutMS int) { - timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) - select { - case <-ch: - case <-timer.C: - t.Fatal("Expected to fire") - } -} - -func checkTxs(t *testing.T, mp mempool.Mempool, count int, peerID uint16) types.Txs { - txs := make(types.Txs, count) - txInfo := mempool.TxInfo{SenderID: peerID} - for i := 0; i < count; i++ { - txBytes := make([]byte, 20) - txs[i] = txBytes - _, err := rand.Read(txBytes) - if err != nil { - t.Error(err) - } - if err := mp.CheckTx(txBytes, nil, txInfo); err != nil { - // Skip invalid txs. - // TestMempoolFilters will fail otherwise. It asserts a number of txs - // returned. - if mempool.IsPreCheckError(err) { - continue - } - t.Fatalf("CheckTx failed: %v while checking #%d tx", err, i) - } - } - return txs -} - -func TestReapMaxBytesMaxGas(t *testing.T) { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - - // Ensure gas calculation behaves as expected - checkTxs(t, mp, 1, mempool.UnknownPeerID) - tx0 := mp.TxsFront().Value.(*mempoolTx) - // assert that kv store has gas wanted = 1. - require.Equal(t, app.CheckTx(abci.RequestCheckTx{Tx: tx0.tx}).GasWanted, int64(1), "KVStore had a gas value neq to 1") - require.Equal(t, tx0.gasWanted, int64(1), "transactions gas was set incorrectly") - // ensure each tx is 20 bytes long - require.Equal(t, len(tx0.tx), 20, "Tx is longer than 20 bytes") - mp.Flush() - - // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs. - // each tx has 20 bytes - tests := []struct { - numTxsToCreate int - maxBytes int64 - maxGas int64 - expectedNumTxs int - }{ - {20, -1, -1, 20}, - {20, -1, 0, 0}, - {20, -1, 10, 10}, - {20, -1, 30, 20}, - {20, 0, -1, 0}, - {20, 0, 10, 0}, - {20, 10, 10, 0}, - {20, 24, 10, 1}, - {20, 240, 5, 5}, - {20, 240, -1, 10}, - {20, 240, 10, 10}, - {20, 240, 15, 10}, - {20, 20000, -1, 20}, - {20, 20000, 5, 5}, - {20, 20000, 30, 20}, - } - for tcIndex, tt := range tests { - checkTxs(t, mp, tt.numTxsToCreate, mempool.UnknownPeerID) - got := mp.ReapMaxBytesMaxGas(tt.maxBytes, tt.maxGas) - assert.Equal(t, tt.expectedNumTxs, len(got), "Got %d txs, expected %d, tc #%d", - len(got), tt.expectedNumTxs, tcIndex) - mp.Flush() - } -} - -func TestMempoolFilters(t *testing.T) { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - emptyTxArr := []types.Tx{[]byte{}} - - nopPreFilter := func(tx types.Tx) error { return nil } - nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil } - - // each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs. - // each tx has 20 bytes - tests := []struct { - numTxsToCreate int - preFilter mempool.PreCheckFunc - postFilter mempool.PostCheckFunc - expectedNumTxs int - }{ - {10, nopPreFilter, nopPostFilter, 10}, - {10, mempool.PreCheckMaxBytes(10), nopPostFilter, 0}, - {10, mempool.PreCheckMaxBytes(26), nopPostFilter, 10}, - {10, nopPreFilter, mempool.PostCheckMaxGas(-1), 10}, - {10, nopPreFilter, mempool.PostCheckMaxGas(0), 0}, - {10, nopPreFilter, mempool.PostCheckMaxGas(1), 10}, - {10, nopPreFilter, mempool.PostCheckMaxGas(3000), 10}, - {10, mempool.PreCheckMaxBytes(10), mempool.PostCheckMaxGas(20), 0}, - {10, mempool.PreCheckMaxBytes(30), mempool.PostCheckMaxGas(20), 10}, - {10, mempool.PreCheckMaxBytes(28), mempool.PostCheckMaxGas(1), 10}, - {10, mempool.PreCheckMaxBytes(22), mempool.PostCheckMaxGas(0), 0}, - } - for tcIndex, tt := range tests { - err := mp.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter) - require.NoError(t, err) - checkTxs(t, mp, tt.numTxsToCreate, mempool.UnknownPeerID) - require.Equal(t, tt.expectedNumTxs, mp.Size(), "mempool had the incorrect size, on test case %d", tcIndex) - mp.Flush() - } -} - -func TestMempoolUpdate(t *testing.T) { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - - // 1. Adds valid txs to the cache - { - err := mp.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil) - require.NoError(t, err) - err = mp.CheckTx([]byte{0x01}, nil, mempool.TxInfo{}) - if assert.Error(t, err) { - assert.Equal(t, mempool.ErrTxInCache, err) - } - } - - // 2. Removes valid txs from the mempool - { - err := mp.CheckTx([]byte{0x02}, nil, mempool.TxInfo{}) - require.NoError(t, err) - err = mp.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil) - require.NoError(t, err) - assert.Zero(t, mp.Size()) - } - - // 3. Removes invalid transactions from the cache and the mempool (if present) - { - err := mp.CheckTx([]byte{0x03}, nil, mempool.TxInfo{}) - require.NoError(t, err) - err = mp.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil) - require.NoError(t, err) - assert.Zero(t, mp.Size()) - - err = mp.CheckTx([]byte{0x03}, nil, mempool.TxInfo{}) - require.NoError(t, err) - } -} - -func TestMempoolUpdateDoesNotPanicWhenApplicationMissedTx(t *testing.T) { - var callback abciclient.Callback - mockClient := new(abciclimocks.Client) - mockClient.On("Start").Return(nil) - mockClient.On("SetLogger", mock.Anything) - - mockClient.On("Error").Return(nil).Times(4) - mockClient.On("FlushAsync", mock.Anything).Return(abciclient.NewReqRes(abci.ToRequestFlush()), nil) - mockClient.On("SetResponseCallback", mock.MatchedBy(func(cb abciclient.Callback) bool { callback = cb; return true })) - - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mp, cleanup, err := newMempoolWithAppMock(cc, mockClient) - require.NoError(t, err) - defer cleanup() - - // Add 4 transactions to the mempool by calling the mempool's `CheckTx` on each of them. - txs := []types.Tx{[]byte{0x01}, []byte{0x02}, []byte{0x03}, []byte{0x04}} - for _, tx := range txs { - reqRes := abciclient.NewReqRes(abci.ToRequestCheckTx(abci.RequestCheckTx{Tx: tx})) - reqRes.Response = abci.ToResponseCheckTx(abci.ResponseCheckTx{Code: abci.CodeTypeOK}) - - mockClient.On("CheckTxAsync", mock.Anything, mock.Anything).Return(reqRes, nil) - err := mp.CheckTx(tx, nil, mempool.TxInfo{}) - require.NoError(t, err) - - // ensure that the callback that the mempool sets on the ReqRes is run. - reqRes.InvokeCallback() - } - - // Calling update to remove the first transaction from the mempool. - // This call also triggers the mempool to recheck its remaining transactions. - err = mp.Update(0, []types.Tx{txs[0]}, abciResponses(1, abci.CodeTypeOK), nil, nil) - require.Nil(t, err) - - // The mempool has now sent its requests off to the client to be rechecked - // and is waiting for the corresponding callbacks to be called. - // We now call the mempool-supplied callback on the first and third transaction. - // This simulates the client dropping the second request. - // Previous versions of this code panicked when the ABCI application missed - // a recheck-tx request. - resp := abci.ResponseCheckTx{Code: abci.CodeTypeOK} - req := abci.RequestCheckTx{Tx: txs[1]} - callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp)) - - req = abci.RequestCheckTx{Tx: txs[3]} - callback(abci.ToRequestCheckTx(req), abci.ToResponseCheckTx(resp)) - mockClient.AssertExpectations(t) -} - -func TestMempool_KeepInvalidTxsInCache(t *testing.T) { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - wcfg := config.DefaultConfig() - wcfg.Mempool.KeepInvalidTxsInCache = true - mp, cleanup := newMempoolWithAppAndConfig(cc, wcfg) - defer cleanup() - - // 1. An invalid transaction must remain in the cache after Update - { - a := make([]byte, 8) - binary.BigEndian.PutUint64(a, 0) - - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, 1) - - err := mp.CheckTx(b, nil, mempool.TxInfo{}) - require.NoError(t, err) - - // simulate new block - _ = app.DeliverTx(abci.RequestDeliverTx{Tx: a}) - _ = app.DeliverTx(abci.RequestDeliverTx{Tx: b}) - err = mp.Update(1, []types.Tx{a, b}, - []*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}, {Code: 2}}, nil, nil) - require.NoError(t, err) - - // a must be added to the cache - err = mp.CheckTx(a, nil, mempool.TxInfo{}) - if assert.Error(t, err) { - assert.Equal(t, mempool.ErrTxInCache, err) - } - - // b must remain in the cache - err = mp.CheckTx(b, nil, mempool.TxInfo{}) - if assert.Error(t, err) { - assert.Equal(t, mempool.ErrTxInCache, err) - } - } - - // 2. An invalid transaction must remain in the cache - { - a := make([]byte, 8) - binary.BigEndian.PutUint64(a, 0) - - // remove a from the cache to test (2) - mp.cache.Remove(a) - - err := mp.CheckTx(a, nil, mempool.TxInfo{}) - require.NoError(t, err) - } -} - -func TestTxsAvailable(t *testing.T) { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - mp.EnableTxsAvailable() - - timeoutMS := 500 - - // with no txs, it shouldn't fire - ensureNoFire(t, mp.TxsAvailable(), timeoutMS) - - // send a bunch of txs, it should only fire once - txs := checkTxs(t, mp, 100, mempool.UnknownPeerID) - ensureFire(t, mp.TxsAvailable(), timeoutMS) - ensureNoFire(t, mp.TxsAvailable(), timeoutMS) - - // call update with half the txs. - // it should fire once now for the new height - // since there are still txs left - committedTxs, txs := txs[:50], txs[50:] - if err := mp.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { - t.Error(err) - } - ensureFire(t, mp.TxsAvailable(), timeoutMS) - ensureNoFire(t, mp.TxsAvailable(), timeoutMS) - - // send a bunch more txs. we already fired for this height so it shouldn't fire again - moreTxs := checkTxs(t, mp, 50, mempool.UnknownPeerID) - ensureNoFire(t, mp.TxsAvailable(), timeoutMS) - - // now call update with all the txs. it should not fire as there are no txs left - committedTxs = append(txs, moreTxs...) - if err := mp.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { - t.Error(err) - } - ensureNoFire(t, mp.TxsAvailable(), timeoutMS) - - // send a bunch more txs, it should only fire once - checkTxs(t, mp, 100, mempool.UnknownPeerID) - ensureFire(t, mp.TxsAvailable(), timeoutMS) - ensureNoFire(t, mp.TxsAvailable(), timeoutMS) -} - -func TestSerialReap(t *testing.T) { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - - mp, cleanup := newMempoolWithApp(cc) - defer cleanup() - - appConnCon, _ := cc.NewABCIClient() - appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) - err := appConnCon.Start() - require.Nil(t, err) - - cacheMap := make(map[string]struct{}) - deliverTxsRange := func(start, end int) { - // Deliver some txs. - for i := start; i < end; i++ { - - // This will succeed - txBytes := make([]byte, 8) - binary.BigEndian.PutUint64(txBytes, uint64(i)) - err := mp.CheckTx(txBytes, nil, mempool.TxInfo{}) - _, cached := cacheMap[string(txBytes)] - if cached { - require.NotNil(t, err, "expected error for cached tx") - } else { - require.Nil(t, err, "expected no err for uncached tx") - } - cacheMap[string(txBytes)] = struct{}{} - - // Duplicates are cached and should return error - err = mp.CheckTx(txBytes, nil, mempool.TxInfo{}) - require.NotNil(t, err, "Expected error after CheckTx on duplicated tx") - } - } - - reapCheck := func(exp int) { - txs := mp.ReapMaxBytesMaxGas(-1, -1) - require.Equal(t, len(txs), exp, fmt.Sprintf("Expected to reap %v txs but got %v", exp, len(txs))) - } - - updateRange := func(start, end int) { - txs := make([]types.Tx, 0) - for i := start; i < end; i++ { - txBytes := make([]byte, 8) - binary.BigEndian.PutUint64(txBytes, uint64(i)) - txs = append(txs, txBytes) - } - if err := mp.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil { - t.Error(err) - } - } - - commitRange := func(start, end int) { - // Deliver some txs. - for i := start; i < end; i++ { - txBytes := make([]byte, 8) - binary.BigEndian.PutUint64(txBytes, uint64(i)) - res, err := appConnCon.DeliverTxSync(abci.RequestDeliverTx{Tx: txBytes}) - if err != nil { - t.Errorf("client error committing tx: %v", err) - } - if res.IsErr() { - t.Errorf("error committing tx. Code:%v result:%X log:%v", - res.Code, res.Data, res.Log) - } - } - res, err := appConnCon.CommitSync() - if err != nil { - t.Errorf("client error committing: %v", err) - } - if len(res.Data) != 8 { - t.Errorf("error committing. Hash:%X", res.Data) - } - } - - //---------------------------------------- - - // Deliver some txs. - deliverTxsRange(0, 100) - - // Reap the txs. - reapCheck(100) - - // Reap again. We should get the same amount - reapCheck(100) - - // Deliver 0 to 999, we should reap 900 new txs - // because 100 were already counted. - deliverTxsRange(0, 1000) - - // Reap the txs. - reapCheck(1000) - - // Reap again. We should get the same amount - reapCheck(1000) - - // Commit from the consensus AppConn - commitRange(0, 500) - updateRange(0, 500) - - // We should have 500 left. - reapCheck(500) - - // Deliver 100 invalid txs and 100 valid txs - deliverTxsRange(900, 1100) - - // We should have 600 now. - reapCheck(600) -} - -func TestMempool_CheckTxChecksTxSize(t *testing.T) { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - - mempl, cleanup := newMempoolWithApp(cc) - defer cleanup() - - maxTxSize := mempl.config.MaxTxBytes - - testCases := []struct { - len int - err bool - }{ - // check small txs. no error - 0: {10, false}, - 1: {1000, false}, - 2: {1000000, false}, - - // check around maxTxSize - 3: {maxTxSize - 1, false}, - 4: {maxTxSize, false}, - 5: {maxTxSize + 1, true}, - } - - for i, testCase := range testCases { - caseString := fmt.Sprintf("case %d, len %d", i, testCase.len) - - tx := cmtrand.Bytes(testCase.len) - - err := mempl.CheckTx(tx, nil, mempool.TxInfo{}) - bv := gogotypes.BytesValue{Value: tx} - bz, err2 := bv.Marshal() - require.NoError(t, err2) - require.Equal(t, len(bz), proto.Size(&bv), caseString) - - if !testCase.err { - require.NoError(t, err, caseString) - } else { - require.Equal(t, err, mempool.ErrTxTooLarge{ - Max: maxTxSize, - Actual: testCase.len, - }, caseString) - } - } -} - -func TestMempoolTxsBytes(t *testing.T) { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - - cfg := config.ResetTestRoot("mempool_test") - - cfg.Mempool.MaxTxsBytes = 10 - mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) - defer cleanup() - - // 1. zero by default - assert.EqualValues(t, 0, mp.SizeBytes()) - - // 2. len(tx) after CheckTx - err := mp.CheckTx([]byte{0x01}, nil, mempool.TxInfo{}) - require.NoError(t, err) - assert.EqualValues(t, 1, mp.SizeBytes()) - - // 3. zero again after tx is removed by Update - err = mp.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil) - require.NoError(t, err) - assert.EqualValues(t, 0, mp.SizeBytes()) - - // 4. zero after Flush - err = mp.CheckTx([]byte{0x02, 0x03}, nil, mempool.TxInfo{}) - require.NoError(t, err) - assert.EqualValues(t, 2, mp.SizeBytes()) - - mp.Flush() - assert.EqualValues(t, 0, mp.SizeBytes()) - - // 5. ErrMempoolIsFull is returned when/if MaxTxsBytes limit is reached. - err = mp.CheckTx( - []byte{0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04, 0x04}, - nil, - mempool.TxInfo{}, - ) - require.NoError(t, err) - - err = mp.CheckTx([]byte{0x05}, nil, mempool.TxInfo{}) - if assert.Error(t, err) { - assert.IsType(t, mempool.ErrMempoolIsFull{}, err) - } - - // 6. zero after tx is rechecked and removed due to not being valid anymore - app2 := kvstore.NewApplication() - cc = proxy.NewLocalClientCreator(app2) - - mp, cleanup = newMempoolWithApp(cc) - defer cleanup() - - txBytes := make([]byte, 8) - binary.BigEndian.PutUint64(txBytes, uint64(0)) - - err = mp.CheckTx(txBytes, nil, mempool.TxInfo{}) - require.NoError(t, err) - assert.EqualValues(t, 8, mp.SizeBytes()) - - appConnCon, _ := cc.NewABCIClient() - appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) - err = appConnCon.Start() - require.Nil(t, err) - t.Cleanup(func() { - if err := appConnCon.Stop(); err != nil { - t.Error(err) - } - }) - - res, err := appConnCon.DeliverTxSync(abci.RequestDeliverTx{Tx: txBytes}) - require.NoError(t, err) - require.EqualValues(t, 0, res.Code) - - res2, err := appConnCon.CommitSync() - require.NoError(t, err) - require.NotEmpty(t, res2.Data) - - // Pretend like we committed nothing so txBytes gets rechecked and removed. - err = mp.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil, nil) - require.NoError(t, err) - assert.EqualValues(t, 8, mp.SizeBytes()) - - // 7. Test RemoveTxByKey function - err = mp.CheckTx([]byte{0x06}, nil, mempool.TxInfo{}) - require.NoError(t, err) - assert.EqualValues(t, 9, mp.SizeBytes()) - assert.Error(t, mp.RemoveTxByKey(types.Tx([]byte{0x07}).Key())) - assert.EqualValues(t, 9, mp.SizeBytes()) - assert.NoError(t, mp.RemoveTxByKey(types.Tx([]byte{0x06}).Key())) - assert.EqualValues(t, 8, mp.SizeBytes()) - -} - -// This will non-deterministically catch some concurrency failures like -// https://github.com/cometbft/cometbft/issues/3509 -// TODO: all of the tests should probably also run using the remote proxy app -// since otherwise we're not actually testing the concurrency of the mempool here! -func TestMempoolRemoteAppConcurrency(t *testing.T) { - sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", cmtrand.Str(6)) - app := kvstore.NewApplication() - _, server := newRemoteApp(t, sockPath, app) - t.Cleanup(func() { - if err := server.Stop(); err != nil { - t.Error(err) - } - }) - - cfg := config.ResetTestRoot("mempool_test") - - mp, cleanup := newMempoolWithAppAndConfig(proxy.NewRemoteClientCreator(sockPath, "socket", true), cfg) - defer cleanup() - - // generate small number of txs - nTxs := 10 - txLen := 200 - txs := make([]types.Tx, nTxs) - for i := 0; i < nTxs; i++ { - txs[i] = cmtrand.Bytes(txLen) - } - - // simulate a group of peers sending them over and over - N := cfg.Mempool.Size - maxPeers := 5 - for i := 0; i < N; i++ { - peerID := mrand.Intn(maxPeers) - txNum := mrand.Intn(nTxs) - tx := txs[txNum] - - // this will err with ErrTxInCache many times ... - mp.CheckTx(tx, nil, mempool.TxInfo{SenderID: uint16(peerID)}) //nolint: errcheck // will error - } - - require.NoError(t, mp.FlushAppConn()) -} - -func TestRemoveBlobTx(t *testing.T) { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - namespaceOne := bytes.Repeat([]byte{1}, consts.NamespaceIDSize) - - cfg := config.ResetTestRoot("mempool_test") - - cfg.Mempool.MaxTxsBytes = 1000 - mp, cleanup := newMempoolWithAppAndConfig(cc, cfg) - defer cleanup() - - originalTx := []byte{1, 2, 3, 4} - indexWrapper, err := types.MarshalIndexWrapper(originalTx, 100) - require.NoError(t, err) - - // create the blobTx - b := tmproto.Blob{ - NamespaceId: namespaceOne, - Data: []byte{1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9}, - ShareVersion: 0, - NamespaceVersion: 0, - } - bTx, err := types.MarshalBlobTx(originalTx, &b) - require.NoError(t, err) - - err = mp.CheckTx(bTx, nil, mempool.TxInfo{}) - require.NoError(t, err) - - err = mp.Update(1, []types.Tx{indexWrapper}, abciResponses(1, abci.CodeTypeOK), nil, nil) - require.NoError(t, err) - assert.EqualValues(t, 0, mp.Size()) - assert.EqualValues(t, 0, mp.SizeBytes()) -} - -// caller must close server -func newRemoteApp(t *testing.T, addr string, app abci.Application) (abciclient.Client, service.Service) { - clientCreator, err := abciclient.NewClient(addr, "socket", true) - require.NoError(t, err) - - // Start server - server := abciserver.NewSocketServer(addr, app) - server.SetLogger(log.TestingLogger().With("module", "abci-server")) - if err := server.Start(); err != nil { - t.Fatalf("Error starting socket server: %v", err.Error()) - } - - return clientCreator, server -} - -func abciResponses(n int, code uint32) []*abci.ResponseDeliverTx { - responses := make([]*abci.ResponseDeliverTx, 0, n) - for i := 0; i < n; i++ { - responses = append(responses, &abci.ResponseDeliverTx{Code: code}) - } - return responses -} diff --git a/mempool/v0/doc.go b/mempool/v0/doc.go deleted file mode 100644 index 3b5d0d20d4..0000000000 --- a/mempool/v0/doc.go +++ /dev/null @@ -1,23 +0,0 @@ -// The mempool pushes new txs onto the proxyAppConn. -// It gets a stream of (req, res) tuples from the proxy. -// The mempool stores good txs in a concurrent linked-list. - -// Multiple concurrent go-routines can traverse this linked-list -// safely by calling .NextWait() on each element. - -// So we have several go-routines: -// 1. Consensus calling Update() and ReapMaxBytesMaxGas() synchronously -// 2. Many mempool reactor's peer routines calling CheckTx() -// 3. Many mempool reactor's peer routines traversing the txs linked list - -// To manage these goroutines, there are three methods of locking. -// 1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe) -// 2. Mutations to the linked-list elements are atomic -// 3. CheckTx() and/or ReapMaxBytesMaxGas() calls can be paused upon Update(), protected by .updateMtx - -// Garbage collection of old elements from mempool.txs is handlde via the -// DetachPrev() call, which makes old elements not reachable by peer -// broadcastTxRoutine(). - -// TODO: Better handle abci client errors. (make it automatically handle connection errors) -package v0 diff --git a/mempool/v0/reactor.go b/mempool/v0/reactor.go deleted file mode 100644 index 78a72cdf13..0000000000 --- a/mempool/v0/reactor.go +++ /dev/null @@ -1,294 +0,0 @@ -package v0 - -import ( - "errors" - "fmt" - "time" - - "github.com/gogo/protobuf/proto" - - cfg "github.com/cometbft/cometbft/config" - "github.com/cometbft/cometbft/libs/clist" - "github.com/cometbft/cometbft/libs/log" - cmtsync "github.com/cometbft/cometbft/libs/sync" - "github.com/cometbft/cometbft/mempool" - "github.com/cometbft/cometbft/p2p" - protomem "github.com/cometbft/cometbft/proto/tendermint/mempool" - "github.com/cometbft/cometbft/types" -) - -// Reactor handles mempool tx broadcasting amongst peers. -// It maintains a map from peer ID to counter, to prevent gossiping txs to the -// peers you received it from. -type Reactor struct { - p2p.BaseReactor - config *cfg.MempoolConfig - mempool *CListMempool - ids *mempoolIDs -} - -type mempoolIDs struct { - mtx cmtsync.RWMutex - peerMap map[p2p.ID]uint16 - nextID uint16 // assumes that a node will never have over 65536 active peers - activeIDs map[uint16]struct{} // used to check if a given peerID key is used, the value doesn't matter -} - -// Reserve searches for the next unused ID and assigns it to the -// peer. -func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { - ids.mtx.Lock() - defer ids.mtx.Unlock() - - curID := ids.nextPeerID() - ids.peerMap[peer.ID()] = curID - ids.activeIDs[curID] = struct{}{} -} - -// nextPeerID returns the next unused peer ID to use. -// This assumes that ids's mutex is already locked. -func (ids *mempoolIDs) nextPeerID() uint16 { - if len(ids.activeIDs) == mempool.MaxActiveIDs { - panic(fmt.Sprintf("node has maximum %d active IDs and wanted to get one more", mempool.MaxActiveIDs)) - } - - _, idExists := ids.activeIDs[ids.nextID] - for idExists { - ids.nextID++ - _, idExists = ids.activeIDs[ids.nextID] - } - curID := ids.nextID - ids.nextID++ - return curID -} - -// Reclaim returns the ID reserved for the peer back to unused pool. -func (ids *mempoolIDs) Reclaim(peer p2p.Peer) { - ids.mtx.Lock() - defer ids.mtx.Unlock() - - removedID, ok := ids.peerMap[peer.ID()] - if ok { - delete(ids.activeIDs, removedID) - delete(ids.peerMap, peer.ID()) - } -} - -// GetForPeer returns an ID reserved for the peer. -func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 { - ids.mtx.RLock() - defer ids.mtx.RUnlock() - - return ids.peerMap[peer.ID()] -} - -func newMempoolIDs() *mempoolIDs { - return &mempoolIDs{ - peerMap: make(map[p2p.ID]uint16), - activeIDs: map[uint16]struct{}{0: {}}, - nextID: 1, // reserve unknownPeerID(0) for mempoolReactor.BroadcastTx - } -} - -// NewReactor returns a new Reactor with the given config and mempool. -func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor { - memR := &Reactor{ - config: config, - mempool: mempool, - ids: newMempoolIDs(), - } - memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR) - return memR -} - -// InitPeer implements Reactor by creating a state for the peer. -func (memR *Reactor) InitPeer(peer p2p.Peer) p2p.Peer { - memR.ids.ReserveForPeer(peer) - return peer -} - -// SetLogger sets the Logger on the reactor and the underlying mempool. -func (memR *Reactor) SetLogger(l log.Logger) { - memR.Logger = l - memR.mempool.SetLogger(l) -} - -// OnStart implements p2p.BaseReactor. -func (memR *Reactor) OnStart() error { - if !memR.config.Broadcast { - memR.Logger.Info("Tx broadcasting is disabled") - } - return nil -} - -// GetChannels implements Reactor by returning the list of channels for this -// reactor. -func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { - largestTx := make([]byte, memR.config.MaxTxBytes) - batchMsg := protomem.Message{ - Sum: &protomem.Message_Txs{ - Txs: &protomem.Txs{Txs: [][]byte{largestTx}}, - }, - } - - return []*p2p.ChannelDescriptor{ - { - ID: mempool.MempoolChannel, - Priority: 5, - RecvMessageCapacity: batchMsg.Size(), - MessageType: &protomem.Message{}, - }, - } -} - -// AddPeer implements Reactor. -// It starts a broadcast routine ensuring all txs are forwarded to the given peer. -func (memR *Reactor) AddPeer(peer p2p.Peer) { - if memR.config.Broadcast { - go memR.broadcastTxRoutine(peer) - } -} - -// RemovePeer implements Reactor. -func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { - memR.ids.Reclaim(peer) - // broadcast routine checks if peer is gone and returns -} - -// Receive implements Reactor. -// It adds any received transactions to the mempool. -func (memR *Reactor) ReceiveEnvelope(e p2p.Envelope) { - memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) - switch msg := e.Message.(type) { - case *protomem.Txs: - protoTxs := msg.GetTxs() - if len(protoTxs) == 0 { - memR.Logger.Error("received empty txs from peer", "src", e.Src) - return - } - txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(e.Src)} - if e.Src != nil { - txInfo.SenderP2PID = e.Src.ID() - } - - var err error - for _, tx := range protoTxs { - ntx := types.Tx(tx) - err = memR.mempool.CheckTx(ntx, nil, txInfo) - if errors.Is(err, mempool.ErrTxInCache) { - memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String()) - } else if err != nil { - memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err) - } - } - default: - memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) - memR.Switch.StopPeerForError(e.Src, fmt.Errorf("mempool cannot handle message of type: %T", e.Message)) - return - } - - // broadcasting happens from go routines per peer -} - -func (memR *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) { - msg := &protomem.Message{} - err := proto.Unmarshal(msgBytes, msg) - if err != nil { - panic(err) - } - uw, err := msg.Unwrap() - if err != nil { - panic(err) - } - memR.ReceiveEnvelope(p2p.Envelope{ - ChannelID: chID, - Src: peer, - Message: uw, - }) -} - -// PeerState describes the state of a peer. -type PeerState interface { - GetHeight() int64 -} - -// Send new mempool txs to peer. -func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { - peerID := memR.ids.GetForPeer(peer) - var next *clist.CElement - - for { - // In case of both next.NextWaitChan() and peer.Quit() are variable at the same time - if !memR.IsRunning() || !peer.IsRunning() { - return - } - // This happens because the CElement we were looking at got garbage - // collected (removed). That is, .NextWait() returned nil. Go ahead and - // start from the beginning. - if next == nil { - select { - case <-memR.mempool.TxsWaitChan(): // Wait until a tx is available - if next = memR.mempool.TxsFront(); next == nil { - continue - } - case <-peer.Quit(): - return - case <-memR.Quit(): - return - } - } - - // Make sure the peer is up to date. - peerState, ok := peer.Get(types.PeerStateKey).(PeerState) - if !ok { - // Peer does not have a state yet. We set it in the consensus reactor, but - // when we add peer in Switch, the order we call reactors#AddPeer is - // different every time due to us using a map. Sometimes other reactors - // will be initialized before the consensus reactor. We should wait a few - // milliseconds and retry. - time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) - continue - } - - // Allow for a lag of 1 block. - memTx := next.Value.(*mempoolTx) - if peerState.GetHeight() < memTx.Height()-1 { - time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) - continue - } - - // NOTE: Transaction batching was disabled due to - // https://github.com/cometbft/cometbft/issues/5796 - - if _, ok := memTx.senders.Load(peerID); !ok { - success := p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck - ChannelID: mempool.MempoolChannel, - Message: &protomem.Txs{Txs: [][]byte{memTx.tx}}, - }, memR.Logger) - if !success { - time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) - continue - } - } - - select { - case <-next.NextWaitChan(): - // see the start of the for loop for nil check - next = next.Next() - case <-peer.Quit(): - return - case <-memR.Quit(): - return - } - } -} - -// TxsMessage is a Message containing transactions. -type TxsMessage struct { - Txs []types.Tx -} - -// String returns a string representation of the TxsMessage. -func (m *TxsMessage) String() string { - return fmt.Sprintf("[TxsMessage %v]", m.Txs) -} diff --git a/mempool/v0/reactor_test.go b/mempool/v0/reactor_test.go deleted file mode 100644 index f2711ddc70..0000000000 --- a/mempool/v0/reactor_test.go +++ /dev/null @@ -1,424 +0,0 @@ -package v0 - -import ( - "encoding/hex" - "errors" - "net" - "sync" - "testing" - "time" - - "github.com/fortytw2/leaktest" - "github.com/go-kit/log/term" - "github.com/gogo/protobuf/proto" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/cometbft/cometbft/abci/example/kvstore" - abci "github.com/cometbft/cometbft/abci/types" - cfg "github.com/cometbft/cometbft/config" - "github.com/cometbft/cometbft/libs/log" - cmtrand "github.com/cometbft/cometbft/libs/rand" - "github.com/cometbft/cometbft/mempool" - "github.com/cometbft/cometbft/p2p" - "github.com/cometbft/cometbft/p2p/mock" - memproto "github.com/cometbft/cometbft/proto/tendermint/mempool" - "github.com/cometbft/cometbft/proxy" - "github.com/cometbft/cometbft/types" -) - -const ( - numTxs = 1000 - timeout = 120 * time.Second // ridiculously high because CircleCI is slow -) - -type peerState struct { - height int64 -} - -func (ps peerState) GetHeight() int64 { - return ps.height -} - -// Send a bunch of txs to the first reactor's mempool and wait for them all to -// be received in the others. -func TestReactorBroadcastTxsMessage(t *testing.T) { - config := cfg.TestConfig() - // if there were more than two reactors, the order of transactions could not be - // asserted in waitForTxsOnReactors (due to transactions gossiping). If we - // replace Connect2Switches (full mesh) with a func, which connects first - // reactor to others and nothing else, this test should also pass with >2 reactors. - const N = 2 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - }() - for _, r := range reactors { - for _, peer := range r.Switch.Peers().List() { - peer.Set(types.PeerStateKey, peerState{1}) - } - } - - txs := checkTxs(t, reactors[0].mempool, numTxs, mempool.UnknownPeerID) - waitForTxsOnReactors(t, txs, reactors) -} - -// regression test for https://github.com/cometbft/cometbft/issues/5408 -func TestReactorConcurrency(t *testing.T) { - config := cfg.TestConfig() - const N = 2 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - }() - for _, r := range reactors { - for _, peer := range r.Switch.Peers().List() { - peer.Set(types.PeerStateKey, peerState{1}) - } - } - var wg sync.WaitGroup - - const numTxs = 5 - - for i := 0; i < 1000; i++ { - wg.Add(2) - - // 1. submit a bunch of txs - // 2. update the whole mempool - txs := checkTxs(t, reactors[0].mempool, numTxs, mempool.UnknownPeerID) - go func() { - defer wg.Done() - - reactors[0].mempool.Lock() - defer reactors[0].mempool.Unlock() - - deliverTxResponses := make([]*abci.ResponseDeliverTx, len(txs)) - for i := range txs { - deliverTxResponses[i] = &abci.ResponseDeliverTx{Code: 0} - } - err := reactors[0].mempool.Update(1, txs, deliverTxResponses, nil, nil) - assert.NoError(t, err) - }() - - // 1. submit a bunch of txs - // 2. update none - _ = checkTxs(t, reactors[1].mempool, numTxs, mempool.UnknownPeerID) - go func() { - defer wg.Done() - - reactors[1].mempool.Lock() - defer reactors[1].mempool.Unlock() - err := reactors[1].mempool.Update(1, []types.Tx{}, make([]*abci.ResponseDeliverTx, 0), nil, nil) - assert.NoError(t, err) - }() - - // 1. flush the mempool - reactors[1].mempool.Flush() - } - - wg.Wait() -} - -// Send a bunch of txs to the first reactor's mempool, claiming it came from peer -// ensure peer gets no txs. -func TestReactorNoBroadcastToSender(t *testing.T) { - config := cfg.TestConfig() - const N = 2 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - }() - for _, r := range reactors { - for _, peer := range r.Switch.Peers().List() { - peer.Set(types.PeerStateKey, peerState{1}) - } - } - - const peerID = 1 - checkTxs(t, reactors[0].mempool, numTxs, peerID) - ensureNoTxs(t, reactors[peerID], 100*time.Millisecond) -} - -func TestReactor_MaxTxBytes(t *testing.T) { - config := cfg.TestConfig() - - const N = 2 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - }() - for _, r := range reactors { - for _, peer := range r.Switch.Peers().List() { - peer.Set(types.PeerStateKey, peerState{1}) - } - } - - // Broadcast a tx, which has the max size - // => ensure it's received by the second reactor. - tx1 := cmtrand.Bytes(config.Mempool.MaxTxBytes) - err := reactors[0].mempool.CheckTx(tx1, nil, mempool.TxInfo{SenderID: mempool.UnknownPeerID}) - require.NoError(t, err) - waitForTxsOnReactors(t, []types.Tx{tx1}, reactors) - - reactors[0].mempool.Flush() - reactors[1].mempool.Flush() - - // Broadcast a tx, which is beyond the max size - // => ensure it's not sent - tx2 := cmtrand.Bytes(config.Mempool.MaxTxBytes + 1) - err = reactors[0].mempool.CheckTx(tx2, nil, mempool.TxInfo{SenderID: mempool.UnknownPeerID}) - require.Error(t, err) -} - -func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - config := cfg.TestConfig() - const N = 2 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - }() - - // stop peer - sw := reactors[1].Switch - sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason")) - - // check that we are not leaking any go-routines - // i.e. broadcastTxRoutine finishes when peer is stopped - leaktest.CheckTimeout(t, 10*time.Second)() -} - -func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) { - if testing.Short() { - t.Skip("skipping test in short mode.") - } - - config := cfg.TestConfig() - const N = 2 - reactors := makeAndConnectReactors(config, N) - - // stop reactors - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - - // check that we are not leaking any go-routines - // i.e. broadcastTxRoutine finishes when reactor is stopped - leaktest.CheckTimeout(t, 10*time.Second)() -} - -func TestMempoolIDsBasic(t *testing.T) { - ids := newMempoolIDs() - - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) - - ids.ReserveForPeer(peer) - assert.EqualValues(t, 1, ids.GetForPeer(peer)) - ids.Reclaim(peer) - - ids.ReserveForPeer(peer) - assert.EqualValues(t, 2, ids.GetForPeer(peer)) - ids.Reclaim(peer) -} - -func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { - if testing.Short() { - return - } - - // 0 is already reserved for UnknownPeerID - ids := newMempoolIDs() - - for i := 0; i < mempool.MaxActiveIDs-1; i++ { - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) - ids.ReserveForPeer(peer) - } - - assert.Panics(t, func() { - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) - ids.ReserveForPeer(peer) - }) -} - -// TODO: This test tests that we don't panic and are able to generate new -// PeerIDs for each peer we add. It seems as though we should be able to test -// this in a much more direct way. -// https://github.com/cometbft/cometbft/issues/9639 -func TestDontExhaustMaxActiveIDs(t *testing.T) { - config := cfg.TestConfig() - const N = 1 - reactors := makeAndConnectReactors(config, N) - defer func() { - for _, r := range reactors { - if err := r.Stop(); err != nil { - assert.NoError(t, err) - } - } - }() - reactor := reactors[0] - - for i := 0; i < mempool.MaxActiveIDs+1; i++ { - peer := mock.NewPeer(nil) - reactor.ReceiveEnvelope(p2p.Envelope{ - ChannelID: mempool.MempoolChannel, - Src: peer, - Message: &memproto.Message{}, // This uses the wrong message type on purpose to stop the peer as in an error state in the reactor. - }, - ) - reactor.AddPeer(peer) - } -} - -func TestLegacyReactorReceiveBasic(t *testing.T) { - config := cfg.TestConfig() - const N = 1 - reactors := makeAndConnectReactors(config, N) - var ( - reactor = reactors[0] - peer = mock.NewPeer(nil) - ) - defer func() { - err := reactor.Stop() - assert.NoError(t, err) - }() - - reactor.InitPeer(peer) - reactor.AddPeer(peer) - m := &memproto.Txs{} - wm := m.Wrap() - msg, err := proto.Marshal(wm) - assert.NoError(t, err) - - assert.NotPanics(t, func() { - reactor.Receive(mempool.MempoolChannel, peer, msg) - }) -} - -// mempoolLogger is a TestingLogger which uses a different -// color for each validator ("validator" key must exist). -func mempoolLogger() log.Logger { - return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor { - for i := 0; i < len(keyvals)-1; i += 2 { - if keyvals[i] == "validator" { - return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))} - } - } - return term.FgBgColor{} - }) -} - -// connect N mempool reactors through N switches -func makeAndConnectReactors(config *cfg.Config, n int) []*Reactor { - reactors := make([]*Reactor, n) - logger := mempoolLogger() - for i := 0; i < n; i++ { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - mempool, cleanup := newMempoolWithApp(cc) - defer cleanup() - - reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states - reactors[i].SetLogger(logger.With("validator", i)) - } - - p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("MEMPOOL", reactors[i]) - return s - - }, p2p.Connect2Switches) - return reactors -} - -func waitForTxsOnReactors(t *testing.T, txs types.Txs, reactors []*Reactor) { - // wait for the txs in all mempools - wg := new(sync.WaitGroup) - for i, reactor := range reactors { - wg.Add(1) - go func(r *Reactor, reactorIndex int) { - defer wg.Done() - waitForTxsOnReactor(t, txs, r, reactorIndex) - }(reactor, i) - } - - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - timer := time.After(timeout) - select { - case <-timer: - t.Fatal("Timed out waiting for txs") - case <-done: - } -} - -func waitForTxsOnReactor(t *testing.T, txs types.Txs, reactor *Reactor, reactorIndex int) { - mempool := reactor.mempool - for mempool.Size() < len(txs) { - time.Sleep(time.Millisecond * 100) - } - - reapedTxs := mempool.ReapMaxTxs(len(txs)) - for i, tx := range txs { - assert.Equalf(t, tx, reapedTxs[i], - "txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i]) - } -} - -// ensure no txs on reactor after some timeout -func ensureNoTxs(t *testing.T, reactor *Reactor, timeout time.Duration) { - time.Sleep(timeout) // wait for the txs in all mempools - assert.Zero(t, reactor.mempool.Size()) -} - -func TestMempoolVectors(t *testing.T) { - testCases := []struct { - testName string - tx []byte - expBytes string - }{ - {"tx 1", []byte{123}, "0a030a017b"}, - {"tx 2", []byte("proto encoding in mempool"), "0a1b0a1970726f746f20656e636f64696e6720696e206d656d706f6f6c"}, - } - - for _, tc := range testCases { - tc := tc - - msg := memproto.Message{ - Sum: &memproto.Message_Txs{ - Txs: &memproto.Txs{Txs: [][]byte{tc.tx}}, - }, - } - bz, err := msg.Marshal() - require.NoError(t, err, tc.testName) - - require.Equal(t, tc.expBytes, hex.EncodeToString(bz), tc.testName) - } -} diff --git a/node/node.go b/node/node.go index 0feeb3edc4..ca23103d18 100644 --- a/node/node.go +++ b/node/node.go @@ -34,7 +34,6 @@ import ( "github.com/cometbft/cometbft/light" mempl "github.com/cometbft/cometbft/mempool" mempoolv2 "github.com/cometbft/cometbft/mempool/cat" - mempoolv0 "github.com/cometbft/cometbft/mempool/v0" mempoolv1 "github.com/cometbft/cometbft/mempool/v1" "github.com/cometbft/cometbft/p2p" "github.com/cometbft/cometbft/p2p/pex" @@ -434,29 +433,6 @@ func createMempoolAndMempoolReactor( return mp, reactor - case cfg.MempoolV0: - mp := mempoolv0.NewCListMempool( - config.Mempool, - proxyApp.Mempool(), - state.LastBlockHeight, - mempoolv0.WithMetrics(memplMetrics), - mempoolv0.WithPreCheck(sm.TxPreCheck(state)), - mempoolv0.WithPostCheck(sm.TxPostCheck(state)), - ) - - mp.SetLogger(logger) - - reactor := mempoolv0.NewReactor( - config.Mempool, - mp, - ) - if config.Consensus.WaitForTxs() { - mp.EnableTxsAvailable() - } - reactor.SetLogger(logger) - - return mp, reactor - default: return nil, nil } diff --git a/node/node_test.go b/node/node_test.go index ea6ed0767e..7bcc2fe26a 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -22,7 +22,6 @@ import ( cmtrand "github.com/cometbft/cometbft/libs/rand" mempl "github.com/cometbft/cometbft/mempool" mempoolv2 "github.com/cometbft/cometbft/mempool/cat" - mempoolv0 "github.com/cometbft/cometbft/mempool/v0" mempoolv1 "github.com/cometbft/cometbft/mempool/v1" "github.com/cometbft/cometbft/p2p" "github.com/cometbft/cometbft/p2p/conn" @@ -251,13 +250,6 @@ func TestCreateProposalBlock(t *testing.T) { var mempool mempl.Mempool switch config.Mempool.Version { - case cfg.MempoolV0: - mempool = mempoolv0.NewCListMempool(config.Mempool, - proxyApp.Mempool(), - state.LastBlockHeight, - mempoolv0.WithMetrics(memplMetrics), - mempoolv0.WithPreCheck(sm.TxPreCheck(state)), - mempoolv0.WithPostCheck(sm.TxPostCheck(state))) case cfg.MempoolV1: mempool = mempoolv1.NewTxMempool(logger, config.Mempool, @@ -365,13 +357,6 @@ func TestMaxProposalBlockSize(t *testing.T) { memplMetrics := mempl.NopMetrics() var mempool mempl.Mempool switch config.Mempool.Version { - case cfg.MempoolV0: - mempool = mempoolv0.NewCListMempool(config.Mempool, - proxyApp.Mempool(), - state.LastBlockHeight, - mempoolv0.WithMetrics(memplMetrics), - mempoolv0.WithPreCheck(sm.TxPreCheck(state)), - mempoolv0.WithPostCheck(sm.TxPostCheck(state))) case cfg.MempoolV1: mempool = mempoolv1.NewTxMempool(logger, config.Mempool, diff --git a/test/e2e/pkg/testnet.go b/test/e2e/pkg/testnet.go index 65f2ee41f0..6557b89bed 100644 --- a/test/e2e/pkg/testnet.go +++ b/test/e2e/pkg/testnet.go @@ -382,7 +382,7 @@ func (n Node) Validate(testnet Testnet) error { } switch n.Mempool { - case "", config.MempoolV0, config.MempoolV1, config.MempoolV2: + case "", config.MempoolV1, config.MempoolV2: default: return fmt.Errorf("invalid mempool version %q", n.Mempool) } diff --git a/test/fuzz/mempool/v0/checktx.go b/test/fuzz/mempool/v0/checktx.go deleted file mode 100644 index fe2d91fffb..0000000000 --- a/test/fuzz/mempool/v0/checktx.go +++ /dev/null @@ -1,34 +0,0 @@ -package v0 - -import ( - "github.com/cometbft/cometbft/abci/example/kvstore" - "github.com/cometbft/cometbft/config" - mempl "github.com/cometbft/cometbft/mempool" - mempoolv0 "github.com/cometbft/cometbft/mempool/v0" - "github.com/cometbft/cometbft/proxy" -) - -var mempool mempl.Mempool - -func init() { - app := kvstore.NewApplication() - cc := proxy.NewLocalClientCreator(app) - appConnMem, _ := cc.NewABCIClient() - err := appConnMem.Start() - if err != nil { - panic(err) - } - - cfg := config.DefaultMempoolConfig() - cfg.Broadcast = false - mempool = mempoolv0.NewCListMempool(cfg, appConnMem, 0) -} - -func Fuzz(data []byte) int { - err := mempool.CheckTx(data, nil, mempl.TxInfo{}) - if err != nil { - return 0 - } - - return 1 -} diff --git a/test/fuzz/mempool/v0/fuzz_test.go b/test/fuzz/mempool/v0/fuzz_test.go deleted file mode 100644 index bc9004d19b..0000000000 --- a/test/fuzz/mempool/v0/fuzz_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package v0_test - -import ( - "io" - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/require" - - mempoolv0 "github.com/cometbft/cometbft/test/fuzz/mempool/v0" -) - -const testdataCasesDir = "testdata/cases" - -func TestMempoolTestdataCases(t *testing.T) { - entries, err := os.ReadDir(testdataCasesDir) - require.NoError(t, err) - - for _, e := range entries { - entry := e - t.Run(entry.Name(), func(t *testing.T) { - defer func() { - r := recover() - require.Nilf(t, r, "testdata/cases test panic") - }() - f, err := os.Open(filepath.Join(testdataCasesDir, entry.Name())) - require.NoError(t, err) - input, err := io.ReadAll(f) - require.NoError(t, err) - mempoolv0.Fuzz(input) - }) - } -} diff --git a/test/fuzz/mempool/v0/testdata/cases/empty b/test/fuzz/mempool/v0/testdata/cases/empty deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/test/maverick/node/node.go b/test/maverick/node/node.go index 6e84a15a2d..3ae26246ba 100644 --- a/test/maverick/node/node.go +++ b/test/maverick/node/node.go @@ -33,7 +33,6 @@ import ( "github.com/cometbft/cometbft/light" mempl "github.com/cometbft/cometbft/mempool" mempoolv2 "github.com/cometbft/cometbft/mempool/cat" - mempoolv0 "github.com/cometbft/cometbft/mempool/v0" mempoolv1 "github.com/cometbft/cometbft/mempool/v1" "github.com/cometbft/cometbft/p2p" "github.com/cometbft/cometbft/p2p/pex" @@ -435,29 +434,6 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns, return reactor, mp - case cfg.MempoolV0: - mp := mempoolv0.NewCListMempool( - config.Mempool, - proxyApp.Mempool(), - state.LastBlockHeight, - mempoolv0.WithMetrics(memplMetrics), - mempoolv0.WithPreCheck(sm.TxPreCheck(state)), - mempoolv0.WithPostCheck(sm.TxPostCheck(state)), - ) - - mp.SetLogger(logger) - mp.SetLogger(logger) - - reactor := mempoolv0.NewReactor( - config.Mempool, - mp, - ) - if config.Consensus.WaitForTxs() { - mp.EnableTxsAvailable() - } - - return reactor, mp - default: return nil, nil }