From e00123a504b3da18d3e61c2d5c2e5b77bc7e7e1d Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu Date: Thu, 31 Aug 2023 20:45:42 +0300 Subject: [PATCH 1/8] added RegisterEvictionHandler on txCache and crossTxCache --- common/errors.go | 4 ++++ txcache/crossTxCache.go | 35 +++++++++++++++++++++++++++++++++-- txcache/eviction.go | 1 + txcache/txCache.go | 40 +++++++++++++++++++++++++++++++++++----- 4 files changed, 73 insertions(+), 7 deletions(-) diff --git a/common/errors.go b/common/errors.go index 6342eb4f..93e11c80 100644 --- a/common/errors.go +++ b/common/errors.go @@ -2,6 +2,7 @@ package common import ( "errors" + "github.com/multiversx/mx-chain-core-go/core" ) @@ -85,3 +86,6 @@ var ErrInvalidCacheExpiry = errors.New("invalid cache expiry") // ErrDBIsClosed is raised when the DB is closed var ErrDBIsClosed = core.ErrDBIsClosed + +// ErrNilEvictionHandler signals that a nil eviction handler has been provided +var ErrNilEvictionHandler = errors.New("nil eviction handler") diff --git a/txcache/crossTxCache.go b/txcache/crossTxCache.go index 0c21e0d9..5148d188 100644 --- a/txcache/crossTxCache.go +++ b/txcache/crossTxCache.go @@ -1,6 +1,9 @@ package txcache import ( + "sync" + + "github.com/multiversx/mx-chain-storage-go/common" "github.com/multiversx/mx-chain-storage-go/immunitycache" "github.com/multiversx/mx-chain-storage-go/types" ) @@ -10,7 +13,9 @@ var _ types.Cacher = (*CrossTxCache)(nil) // CrossTxCache holds cross-shard transactions (where destination == me) type CrossTxCache struct { *immunitycache.ImmunityCache - config ConfigDestinationMe + config ConfigDestinationMe + mutEvictionHandlers sync.RWMutex + evictionHandlers []func(txHash []byte) } // NewCrossTxCache creates a new transactions cache @@ -93,7 +98,11 @@ func (cache *CrossTxCache) Peek(key []byte) (value interface{}, ok bool) { // RemoveTxByHash removes tx by hash func (cache *CrossTxCache) RemoveTxByHash(txHash []byte) bool { - return cache.RemoveWithResult(txHash) + ok := cache.RemoveWithResult(txHash) + if ok { + cache.notifyEvictionHandlers(txHash) + } + return ok } // ForEachTransaction iterates over the transactions in the cache @@ -115,6 +124,28 @@ func (cache *CrossTxCache) GetTransactionsPoolForSender(_ string) []*WrappedTran return make([]*WrappedTransaction, 0) } +// RegisterEvictionHandler registers a handler which will be called when a tx is evicted from cache +func (cache *CrossTxCache) RegisterEvictionHandler(handler func(hash []byte)) error { + if handler == nil { + return common.ErrNilEvictionHandler + } + + cache.mutEvictionHandlers.Lock() + cache.evictionHandlers = append(cache.evictionHandlers, handler) + cache.mutEvictionHandlers.Unlock() + + return nil +} + +// notifyEvictionHandlers will be called on a separate go routine +func (cache *CrossTxCache) notifyEvictionHandlers(txHash []byte) { + cache.mutEvictionHandlers.RLock() + for _, handler := range cache.evictionHandlers { + handler(txHash) + } + cache.mutEvictionHandlers.RUnlock() +} + // IsInterfaceNil returns true if there is no value under the interface func (cache *CrossTxCache) IsInterfaceNil() bool { return cache == nil diff --git a/txcache/eviction.go b/txcache/eviction.go index 985a1986..31c0068f 100644 --- a/txcache/eviction.go +++ b/txcache/eviction.go @@ -69,6 +69,7 @@ func (cache *TxCache) areThereTooManyTxs() bool { // This is called concurrently by two goroutines: the eviction one and the sweeping one func (cache *TxCache) doEvictItems(txsToEvict [][]byte, sendersToEvict []string) (countTxs uint32, countSenders uint32) { + go cache.notifyEvictionHandlers(txsToEvict) countTxs = cache.txByHash.RemoveTxsBulk(txsToEvict) countSenders = cache.txListBySender.RemoveSendersBulk(sendersToEvict) return diff --git a/txcache/txCache.go b/txcache/txCache.go index d938b976..baa87efb 100644 --- a/txcache/txCache.go +++ b/txcache/txCache.go @@ -29,6 +29,8 @@ type TxCache struct { sweepingMutex sync.Mutex sweepingListOfSenders []*txListForSender mutTxOperation sync.Mutex + mutEvictionHandlers sync.RWMutex + evictionHandlers []func(txHash []byte) } // NewTxCache creates a new transaction cache @@ -51,11 +53,12 @@ func NewTxCache(config ConfigSourceMe, txGasHandler TxGasHandler) (*TxCache, err scoreComputerObj := newDefaultScoreComputer(txFeeHelper) txCache := &TxCache{ - name: config.Name, - txListBySender: newTxListBySenderMap(numChunks, senderConstraintsObj, scoreComputerObj, txGasHandler, txFeeHelper), - txByHash: newTxByHashMap(numChunks), - config: config, - evictionJournal: evictionJournal{}, + name: config.Name, + txListBySender: newTxListBySenderMap(numChunks, senderConstraintsObj, scoreComputerObj, txGasHandler, txFeeHelper), + txByHash: newTxByHashMap(numChunks), + config: config, + evictionJournal: evictionJournal{}, + evictionHandlers: make([]func(txHash []byte), 0), } txCache.initSweepable() @@ -88,6 +91,7 @@ func (cache *TxCache) AddTx(tx *WrappedTransaction) (ok bool, added bool) { if len(evicted) > 0 { cache.monitorEvictionWrtSenderLimit(tx.Tx.GetSndAddr(), evicted) + go cache.notifyEvictionHandlers(evicted) cache.txByHash.RemoveTxsBulk(evicted) } @@ -166,6 +170,8 @@ func (cache *TxCache) doAfterSelection() { // RemoveTxByHash removes tx by hash func (cache *TxCache) RemoveTxByHash(txHash []byte) bool { + go cache.notifyEvictionHandlers([][]byte{txHash}) + cache.mutTxOperation.Lock() defer cache.mutTxOperation.Unlock() @@ -298,6 +304,30 @@ func (cache *TxCache) MaxSize() int { return int(cache.config.CountThreshold) } +// RegisterEvictionHandler registers a handler which will be called when a tx is evicted from cache +func (cache *TxCache) RegisterEvictionHandler(handler func(hash []byte)) error { + if handler == nil { + return common.ErrNilEvictionHandler + } + + cache.mutEvictionHandlers.Lock() + cache.evictionHandlers = append(cache.evictionHandlers, handler) + cache.mutEvictionHandlers.Unlock() + + return nil +} + +// notifyEvictionHandlers will be called on a separate go routine +func (cache *TxCache) notifyEvictionHandlers(txHashes [][]byte) { + cache.mutEvictionHandlers.RLock() + for _, handler := range cache.evictionHandlers { + for _, txHash := range txHashes { + handler(txHash) + } + } + cache.mutEvictionHandlers.RUnlock() +} + // RegisterHandler is not implemented func (cache *TxCache) RegisterHandler(func(key []byte, value interface{}), string) { log.Error("TxCache.RegisterHandler is not implemented") From dac1d2f3fe3472c7133a97c183aa06af01f00961 Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu Date: Tue, 5 Sep 2023 14:44:06 +0300 Subject: [PATCH 2/8] added unittests --- txcache/crossTxCache_test.go | 40 +++++++++++++++++++++++++++ txcache/txCache_test.go | 52 ++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/txcache/crossTxCache_test.go b/txcache/crossTxCache_test.go index eca4a64b..6c10c1aa 100644 --- a/txcache/crossTxCache_test.go +++ b/txcache/crossTxCache_test.go @@ -1,10 +1,12 @@ package txcache import ( + "bytes" "fmt" "math" "testing" + "github.com/multiversx/mx-chain-storage-go/common" "github.com/stretchr/testify/require" ) @@ -57,6 +59,44 @@ func TestCrossTxCache_Get(t *testing.T) { require.Equal(t, make([]*WrappedTransaction, 0), cache.GetTransactionsPoolForSender("")) } +func TestCrossTxCache_RegisterEvictionHandler(t *testing.T) { + t.Parallel() + + cache := newCrossTxCacheToTest(1, 8, math.MaxUint16) + + cache.addTestTx("hash-1") + cache.addTestTx("hash-2") + + err := cache.RegisterEvictionHandler(nil) + require.Equal(t, common.ErrNilEvictionHandler, err) + + cnt := 0 + err = cache.RegisterEvictionHandler(func(hash []byte) { + cnt++ + switch cnt { + case 1: + require.True(t, bytes.Equal([]byte("hash-1"), hash)) + case 2: + require.True(t, bytes.Equal([]byte("hash-2"), hash)) + default: + require.Fail(t, "should have not been called") + } + }) + require.NoError(t, err) + + removed := cache.RemoveTxByHash([]byte("hash-1")) + require.True(t, removed) + cache.Remove([]byte("hash-2")) + + foundTx, ok := cache.GetByTxHash([]byte("hash-1")) + require.False(t, ok) + require.Nil(t, foundTx) + + foundTx, ok = cache.GetByTxHash([]byte("hash-2")) + require.False(t, ok) + require.Nil(t, foundTx) +} + func newCrossTxCacheToTest(numChunks uint32, maxNumItems uint32, numMaxBytes uint32) *CrossTxCache { cache, err := NewCrossTxCache(ConfigDestinationMe{ Name: "test", diff --git a/txcache/txCache_test.go b/txcache/txCache_test.go index 3a8b41c4..7d34e952 100644 --- a/txcache/txCache_test.go +++ b/txcache/txCache_test.go @@ -1,11 +1,13 @@ package txcache import ( + "bytes" "errors" "fmt" "math" "sort" "sync" + "sync/atomic" "testing" "time" @@ -543,12 +545,20 @@ func TestTxCache_TransactionIsAdded_EvenWhenInternalMapsAreInconsistent(t *testi func TestTxCache_NoCriticalInconsistency_WhenConcurrentAdditionsAndRemovals(t *testing.T) { cache := newUnconstrainedCacheToTest() + handlerCalls := uint32(0) + evictionHandlerWG := sync.WaitGroup{} + _ = cache.RegisterEvictionHandler(func(hash []byte) { + atomic.AddUint32(&handlerCalls, 1) + evictionHandlerWG.Done() + }) + // A lot of routines concur to add & remove THE FIRST transaction of a sender for try := 0; try < 100; try++ { var wg sync.WaitGroup for i := 0; i < 50; i++ { wg.Add(1) + evictionHandlerWG.Add(1) go func() { cache.AddTx(createTx([]byte("alice-x"), "alice", 42)) _ = cache.RemoveTxByHash([]byte("alice-x")) @@ -590,6 +600,7 @@ func TestTxCache_NoCriticalInconsistency_WhenConcurrentAdditionsAndRemovals(t *t for i := 0; i < 50; i++ { wg.Add(1) + evictionHandlerWG.Add(1) go func() { cache.AddTx(createTx([]byte("alice-x"), "alice", 42)) _ = cache.RemoveTxByHash([]byte("alice-x")) @@ -624,6 +635,47 @@ func TestTxCache_NoCriticalInconsistency_WhenConcurrentAdditionsAndRemovals(t *t } cache.Clear() + + evictionHandlerWG.Wait() + require.Equal(t, uint32(10000), atomic.LoadUint32(&handlerCalls)) +} + +func TestTxCache_RegisterEvictionHandler(t *testing.T) { + t.Parallel() + + cache := newUnconstrainedCacheToTest() + + cache.AddTx(createTx([]byte("hash-1"), "alice", 1)) + cache.AddTx(createTx([]byte("hash-2"), "alice", 2)) + + err := cache.RegisterEvictionHandler(nil) + require.Equal(t, common.ErrNilEvictionHandler, err) + + cnt := 0 + err = cache.RegisterEvictionHandler(func(hash []byte) { + cnt++ + switch cnt { + case 1: + require.True(t, bytes.Equal([]byte("hash-1"), hash)) + case 2: + require.True(t, bytes.Equal([]byte("hash-2"), hash)) + default: + require.Fail(t, "should have not been called") + } + }) + require.NoError(t, err) + + removed := cache.RemoveTxByHash([]byte("hash-1")) + require.True(t, removed) + cache.Remove([]byte("hash-2")) + + foundTx, ok := cache.GetByTxHash([]byte("hash-1")) + require.False(t, ok) + require.Nil(t, foundTx) + + foundTx, ok = cache.GetByTxHash([]byte("hash-2")) + require.False(t, ok) + require.Nil(t, foundTx) } func newUnconstrainedCacheToTest() *TxCache { From 1bf4658d9cd56ff7cc73f42c2cb03acfbfa801c9 Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu Date: Tue, 5 Sep 2023 15:17:53 +0300 Subject: [PATCH 3/8] fixed race in tests --- txcache/crossTxCache.go | 2 +- txcache/crossTxCache_test.go | 25 +++++++++---------------- txcache/txCache_test.go | 26 ++++++++++++++++---------- 3 files changed, 26 insertions(+), 27 deletions(-) diff --git a/txcache/crossTxCache.go b/txcache/crossTxCache.go index 5148d188..6f0267cb 100644 --- a/txcache/crossTxCache.go +++ b/txcache/crossTxCache.go @@ -100,7 +100,7 @@ func (cache *CrossTxCache) Peek(key []byte) (value interface{}, ok bool) { func (cache *CrossTxCache) RemoveTxByHash(txHash []byte) bool { ok := cache.RemoveWithResult(txHash) if ok { - cache.notifyEvictionHandlers(txHash) + go cache.notifyEvictionHandlers(txHash) } return ok } diff --git a/txcache/crossTxCache_test.go b/txcache/crossTxCache_test.go index 6c10c1aa..b76db13d 100644 --- a/txcache/crossTxCache_test.go +++ b/txcache/crossTxCache_test.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "testing" + "time" "github.com/multiversx/mx-chain-storage-go/common" "github.com/stretchr/testify/require" @@ -65,36 +66,28 @@ func TestCrossTxCache_RegisterEvictionHandler(t *testing.T) { cache := newCrossTxCacheToTest(1, 8, math.MaxUint16) cache.addTestTx("hash-1") - cache.addTestTx("hash-2") err := cache.RegisterEvictionHandler(nil) require.Equal(t, common.ErrNilEvictionHandler, err) - cnt := 0 + ch := make(chan struct{}) err = cache.RegisterEvictionHandler(func(hash []byte) { - cnt++ - switch cnt { - case 1: - require.True(t, bytes.Equal([]byte("hash-1"), hash)) - case 2: - require.True(t, bytes.Equal([]byte("hash-2"), hash)) - default: - require.Fail(t, "should have not been called") - } + require.True(t, bytes.Equal([]byte("hash-1"), hash)) + ch <- struct{}{} }) require.NoError(t, err) removed := cache.RemoveTxByHash([]byte("hash-1")) require.True(t, removed) - cache.Remove([]byte("hash-2")) + select { + case <-ch: + case <-time.After(time.Second): + require.Fail(t, "timeout") + } foundTx, ok := cache.GetByTxHash([]byte("hash-1")) require.False(t, ok) require.Nil(t, foundTx) - - foundTx, ok = cache.GetByTxHash([]byte("hash-2")) - require.False(t, ok) - require.Nil(t, foundTx) } func newCrossTxCacheToTest(numChunks uint32, maxNumItems uint32, numMaxBytes uint32) *CrossTxCache { diff --git a/txcache/txCache_test.go b/txcache/txCache_test.go index 7d34e952..d806b387 100644 --- a/txcache/txCache_test.go +++ b/txcache/txCache_test.go @@ -651,23 +651,29 @@ func TestTxCache_RegisterEvictionHandler(t *testing.T) { err := cache.RegisterEvictionHandler(nil) require.Equal(t, common.ErrNilEvictionHandler, err) - cnt := 0 + ch := make(chan uint32) + cnt := uint32(0) err = cache.RegisterEvictionHandler(func(hash []byte) { - cnt++ - switch cnt { - case 1: - require.True(t, bytes.Equal([]byte("hash-1"), hash)) - case 2: - require.True(t, bytes.Equal([]byte("hash-2"), hash)) - default: - require.Fail(t, "should have not been called") - } + atomic.AddUint32(&cnt, 1) + require.True(t, bytes.Equal([]byte("hash-1"), hash) || bytes.Equal([]byte("hash-2"), hash)) + ch <- atomic.LoadUint32(&cnt) }) require.NoError(t, err) removed := cache.RemoveTxByHash([]byte("hash-1")) require.True(t, removed) cache.Remove([]byte("hash-2")) + for { + chCnt := uint32(0) + select { + case chCnt = <-ch: + case <-time.After(time.Second): + require.Fail(t, "timeout") + } + if chCnt == 2 { + break + } + } foundTx, ok := cache.GetByTxHash([]byte("hash-1")) require.False(t, ok) From e11469f832d7723665f679e51d5af2baba505217 Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu Date: Wed, 6 Sep 2023 13:16:14 +0300 Subject: [PATCH 4/8] added worker pool on evicted txs to limit the number of goroutines --- txcache/baseTxCache.go | 43 ++++++++++++++ txcache/crossTxCache.go | 40 ++++--------- txcache/eviction.go | 2 +- txcache/txCache.go | 31 +++++----- txcache/txCacheEvictionWorkerPool.go | 48 +++++++++++++++ txcache/txCacheEvictionWorkerPool_test.go | 71 +++++++++++++++++++++++ 6 files changed, 191 insertions(+), 44 deletions(-) create mode 100644 txcache/baseTxCache.go create mode 100644 txcache/txCacheEvictionWorkerPool.go create mode 100644 txcache/txCacheEvictionWorkerPool_test.go diff --git a/txcache/baseTxCache.go b/txcache/baseTxCache.go new file mode 100644 index 00000000..357613be --- /dev/null +++ b/txcache/baseTxCache.go @@ -0,0 +1,43 @@ +package txcache + +import ( + "context" + "sync" + + "github.com/multiversx/mx-chain-storage-go/common" +) + +const numOfEvictionWorkers = uint32(5) + +type evictionWorkerPool interface { + StartWorkingEvictedHashes(ctx context.Context, handler func([]byte)) + AddEvictedHashes(hashes [][]byte) +} + +type baseTxCache struct { + mutEvictionHandlers sync.RWMutex + evictionHandlers []func(txHash []byte) + evictionWorkerPool evictionWorkerPool +} + +// RegisterEvictionHandler registers a handler which will be called when a tx is evicted from cache +func (cache *baseTxCache) RegisterEvictionHandler(handler func(hash []byte)) error { + if handler == nil { + return common.ErrNilEvictionHandler + } + + cache.mutEvictionHandlers.Lock() + cache.evictionHandlers = append(cache.evictionHandlers, handler) + cache.mutEvictionHandlers.Unlock() + + return nil +} + +// notifyEvictionHandlers will be called on a separate go routine +func (cache *baseTxCache) notifyEvictionHandlers(txHash []byte) { + cache.mutEvictionHandlers.RLock() + for _, handler := range cache.evictionHandlers { + handler(txHash) + } + cache.mutEvictionHandlers.RUnlock() +} diff --git a/txcache/crossTxCache.go b/txcache/crossTxCache.go index 6f0267cb..00de3f28 100644 --- a/txcache/crossTxCache.go +++ b/txcache/crossTxCache.go @@ -1,9 +1,8 @@ package txcache import ( - "sync" + "context" - "github.com/multiversx/mx-chain-storage-go/common" "github.com/multiversx/mx-chain-storage-go/immunitycache" "github.com/multiversx/mx-chain-storage-go/types" ) @@ -13,9 +12,8 @@ var _ types.Cacher = (*CrossTxCache)(nil) // CrossTxCache holds cross-shard transactions (where destination == me) type CrossTxCache struct { *immunitycache.ImmunityCache - config ConfigDestinationMe - mutEvictionHandlers sync.RWMutex - evictionHandlers []func(txHash []byte) + *baseTxCache + config ConfigDestinationMe } // NewCrossTxCache creates a new transactions cache @@ -42,9 +40,15 @@ func NewCrossTxCache(config ConfigDestinationMe) (*CrossTxCache, error) { cache := CrossTxCache{ ImmunityCache: immunityCache, - config: config, + baseTxCache: &baseTxCache{ + evictionHandlers: make([]func(txHash []byte), 0), + evictionWorkerPool: NewWorkerPool(numOfEvictionWorkers), + }, + config: config, } + cache.evictionWorkerPool.StartWorkingEvictedHashes(context.Background(), cache.notifyEvictionHandlers) + return &cache, nil } @@ -100,7 +104,7 @@ func (cache *CrossTxCache) Peek(key []byte) (value interface{}, ok bool) { func (cache *CrossTxCache) RemoveTxByHash(txHash []byte) bool { ok := cache.RemoveWithResult(txHash) if ok { - go cache.notifyEvictionHandlers(txHash) + cache.evictionWorkerPool.AddEvictedHashes([][]byte{txHash}) } return ok } @@ -124,28 +128,6 @@ func (cache *CrossTxCache) GetTransactionsPoolForSender(_ string) []*WrappedTran return make([]*WrappedTransaction, 0) } -// RegisterEvictionHandler registers a handler which will be called when a tx is evicted from cache -func (cache *CrossTxCache) RegisterEvictionHandler(handler func(hash []byte)) error { - if handler == nil { - return common.ErrNilEvictionHandler - } - - cache.mutEvictionHandlers.Lock() - cache.evictionHandlers = append(cache.evictionHandlers, handler) - cache.mutEvictionHandlers.Unlock() - - return nil -} - -// notifyEvictionHandlers will be called on a separate go routine -func (cache *CrossTxCache) notifyEvictionHandlers(txHash []byte) { - cache.mutEvictionHandlers.RLock() - for _, handler := range cache.evictionHandlers { - handler(txHash) - } - cache.mutEvictionHandlers.RUnlock() -} - // IsInterfaceNil returns true if there is no value under the interface func (cache *CrossTxCache) IsInterfaceNil() bool { return cache == nil diff --git a/txcache/eviction.go b/txcache/eviction.go index 31c0068f..3eaef345 100644 --- a/txcache/eviction.go +++ b/txcache/eviction.go @@ -69,7 +69,7 @@ func (cache *TxCache) areThereTooManyTxs() bool { // This is called concurrently by two goroutines: the eviction one and the sweeping one func (cache *TxCache) doEvictItems(txsToEvict [][]byte, sendersToEvict []string) (countTxs uint32, countSenders uint32) { - go cache.notifyEvictionHandlers(txsToEvict) + cache.evictionWorkerPool.AddEvictedHashes(txsToEvict) countTxs = cache.txByHash.RemoveTxsBulk(txsToEvict) countSenders = cache.txListBySender.RemoveSendersBulk(sendersToEvict) return diff --git a/txcache/txCache.go b/txcache/txCache.go index baa87efb..a02cdeae 100644 --- a/txcache/txCache.go +++ b/txcache/txCache.go @@ -1,6 +1,7 @@ package txcache import ( + "context" "sync" "github.com/multiversx/mx-chain-core-go/core/atomic" @@ -14,6 +15,7 @@ var _ types.Cacher = (*TxCache)(nil) // TxCache represents a cache-like structure (it has a fixed capacity and implements an eviction mechanism) for holding transactions type TxCache struct { + *baseTxCache name string txListBySender *txListBySenderMap txByHash *txByHashMap @@ -29,8 +31,6 @@ type TxCache struct { sweepingMutex sync.Mutex sweepingListOfSenders []*txListForSender mutTxOperation sync.Mutex - mutEvictionHandlers sync.RWMutex - evictionHandlers []func(txHash []byte) } // NewTxCache creates a new transaction cache @@ -53,14 +53,19 @@ func NewTxCache(config ConfigSourceMe, txGasHandler TxGasHandler) (*TxCache, err scoreComputerObj := newDefaultScoreComputer(txFeeHelper) txCache := &TxCache{ - name: config.Name, - txListBySender: newTxListBySenderMap(numChunks, senderConstraintsObj, scoreComputerObj, txGasHandler, txFeeHelper), - txByHash: newTxByHashMap(numChunks), - config: config, - evictionJournal: evictionJournal{}, - evictionHandlers: make([]func(txHash []byte), 0), + baseTxCache: &baseTxCache{ + evictionHandlers: make([]func(txHash []byte), 0), + evictionWorkerPool: NewWorkerPool(numOfEvictionWorkers), + }, + name: config.Name, + txListBySender: newTxListBySenderMap(numChunks, senderConstraintsObj, scoreComputerObj, txGasHandler, txFeeHelper), + txByHash: newTxByHashMap(numChunks), + config: config, + evictionJournal: evictionJournal{}, } + txCache.evictionWorkerPool.StartWorkingEvictedHashes(context.Background(), txCache.notifyEvictionHandlers) + txCache.initSweepable() return txCache, nil } @@ -91,7 +96,7 @@ func (cache *TxCache) AddTx(tx *WrappedTransaction) (ok bool, added bool) { if len(evicted) > 0 { cache.monitorEvictionWrtSenderLimit(tx.Tx.GetSndAddr(), evicted) - go cache.notifyEvictionHandlers(evicted) + cache.evictionWorkerPool.AddEvictedHashes(evicted) cache.txByHash.RemoveTxsBulk(evicted) } @@ -170,7 +175,7 @@ func (cache *TxCache) doAfterSelection() { // RemoveTxByHash removes tx by hash func (cache *TxCache) RemoveTxByHash(txHash []byte) bool { - go cache.notifyEvictionHandlers([][]byte{txHash}) + cache.evictionWorkerPool.AddEvictedHashes([][]byte{txHash}) cache.mutTxOperation.Lock() defer cache.mutTxOperation.Unlock() @@ -318,12 +323,10 @@ func (cache *TxCache) RegisterEvictionHandler(handler func(hash []byte)) error { } // notifyEvictionHandlers will be called on a separate go routine -func (cache *TxCache) notifyEvictionHandlers(txHashes [][]byte) { +func (cache *TxCache) notifyEvictionHandlers(txHash []byte) { cache.mutEvictionHandlers.RLock() for _, handler := range cache.evictionHandlers { - for _, txHash := range txHashes { - handler(txHash) - } + handler(txHash) } cache.mutEvictionHandlers.RUnlock() } diff --git a/txcache/txCacheEvictionWorkerPool.go b/txcache/txCacheEvictionWorkerPool.go new file mode 100644 index 00000000..8c0b32ca --- /dev/null +++ b/txcache/txCacheEvictionWorkerPool.go @@ -0,0 +1,48 @@ +package txcache + +import ( + "context" +) + +type txCacheEvictionWorkerPool struct { + maxWorkers uint32 + evictedHashesQueue chan []byte +} + +// NewWorkerPool returns a new workerPool instance +func NewWorkerPool(maxWorkers uint32) *txCacheEvictionWorkerPool { + return &txCacheEvictionWorkerPool{ + maxWorkers: maxWorkers, + evictedHashesQueue: make(chan []byte), + } +} + +// StartWorkingEvictedHashes starts the workers go routines +func (wp *txCacheEvictionWorkerPool) StartWorkingEvictedHashes(ctx context.Context, handler func(hash []byte)) { + if handler == nil { + return + } + + for i := uint32(0); i < wp.maxWorkers; i++ { + go wp.startWorker(ctx, handler) + } +} + +func (wp *txCacheEvictionWorkerPool) startWorker(ctx context.Context, handler func(hash []byte)) { + for { + select { + case <-ctx.Done(): + log.Debug("closing evicted hashes worker...") + return + case evictedHash := <-wp.evictedHashesQueue: + handler(evictedHash) + } + } +} + +// AddEvictedHashes adds the evicted hashes to the queue +func (wp *txCacheEvictionWorkerPool) AddEvictedHashes(hashes [][]byte) { + for i := 0; i < len(hashes); i++ { + wp.evictedHashesQueue <- hashes[i] + } +} diff --git a/txcache/txCacheEvictionWorkerPool_test.go b/txcache/txCacheEvictionWorkerPool_test.go new file mode 100644 index 00000000..e7e54152 --- /dev/null +++ b/txcache/txCacheEvictionWorkerPool_test.go @@ -0,0 +1,71 @@ +package txcache + +import ( + "context" + "fmt" + "runtime" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestWorkerPool_WithGoroutinesCheck(t *testing.T) { + evictedHashes := make(map[string]int) + mutEvictedHashes := sync.RWMutex{} + + numWorkers := 5 + backgroundGoroutines := runtime.NumGoroutine() + wp := NewWorkerPool(uint32(numWorkers)) + ctx, cancel := context.WithCancel(context.Background()) + wp.StartWorkingEvictedHashes(ctx, func(hash []byte) { + mutEvictedHashes.Lock() + defer mutEvictedHashes.Unlock() + + evictedHashes[string(hash)]++ + }) + + // expected goroutines: 5 workers + background + expectedGoroutines := numWorkers + backgroundGoroutines + require.Equal(t, expectedGoroutines, runtime.NumGoroutine()) + + numHashes := 10000 + for i := 0; i < numHashes; i++ { + go func(idx int) { + time.Sleep(time.Millisecond * 100) + + hash := fmt.Sprintf("hash_%d", idx) + wp.AddEvictedHashes([][]byte{[]byte(hash)}) + }(i) + } + + // expected goroutines: 10000 AddEvictedHashes + 5 workers + background + expectedGoroutines = numHashes + numWorkers + backgroundGoroutines + require.Equal(t, expectedGoroutines, runtime.NumGoroutine()) + + // allow all hashes to be "evicted" + time.Sleep(time.Millisecond * 150) + + // workers still running with no pending evicted tx + // expected goroutines: 5 workers + background + expectedGoroutines = numWorkers + backgroundGoroutines + require.Equal(t, expectedGoroutines, runtime.NumGoroutine()) + + // close the workers + cancel() + + // allow all workers to close + time.Sleep(time.Millisecond * 5) + + // expected goroutines: background + expectedGoroutines = backgroundGoroutines + require.Equal(t, expectedGoroutines, runtime.NumGoroutine()) + + mutEvictedHashes.RLock() + defer mutEvictedHashes.RUnlock() + require.Equal(t, numHashes, len(evictedHashes)) + for _, cnt := range evictedHashes { + require.Equal(t, 1, cnt) + } +} From 781f223bddccc44927d2db85669caa75c9d76025 Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu Date: Wed, 6 Sep 2023 15:47:11 +0300 Subject: [PATCH 5/8] use gammazero/workerpool --- go.mod | 2 + go.sum | 5 ++ txcache/baseTxCache.go | 13 +++-- txcache/crossTxCache.go | 17 ++++-- txcache/crossTxCache_test.go | 3 + txcache/eviction.go | 4 +- txcache/eviction_test.go | 9 +++ txcache/sweeping_test.go | 3 + txcache/txCache.go | 40 ++++--------- txcache/txCacheEvictionWorkerPool.go | 48 --------------- txcache/txCacheEvictionWorkerPool_test.go | 71 ----------------------- txcache/txCache_test.go | 6 ++ 12 files changed, 59 insertions(+), 162 deletions(-) delete mode 100644 txcache/txCacheEvictionWorkerPool.go delete mode 100644 txcache/txCacheEvictionWorkerPool_test.go diff --git a/go.mod b/go.mod index 8353fd02..538244d4 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/multiversx/mx-chain-storage-go go 1.20 require ( + github.com/gammazero/workerpool v1.1.3 github.com/hashicorp/golang-lru v0.6.0 github.com/multiversx/concurrent-map v0.1.4 github.com/multiversx/mx-chain-core-go v1.2.13 @@ -14,6 +15,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/denisbrodbeck/machineid v1.0.1 // indirect + github.com/gammazero/deque v0.2.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect diff --git a/go.sum b/go.sum index 49562433..b9d7ee72 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,10 @@ github.com/denisbrodbeck/machineid v1.0.1/go.mod h1:dJUwb7PTidGDeYyUBmXZ2GphQBbj github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/gammazero/deque v0.2.0 h1:SkieyNB4bg2/uZZLxvya0Pq6diUlwx7m2TeT7GAIWaA= +github.com/gammazero/deque v0.2.0/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= +github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q= +github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -58,6 +62,7 @@ github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/txcache/baseTxCache.go b/txcache/baseTxCache.go index 357613be..91f441c2 100644 --- a/txcache/baseTxCache.go +++ b/txcache/baseTxCache.go @@ -1,17 +1,16 @@ package txcache import ( - "context" "sync" "github.com/multiversx/mx-chain-storage-go/common" ) -const numOfEvictionWorkers = uint32(5) +const maxNumOfEvictionWorkers = 5 type evictionWorkerPool interface { - StartWorkingEvictedHashes(ctx context.Context, handler func([]byte)) - AddEvictedHashes(hashes [][]byte) + Stop() + Submit(task func()) } type baseTxCache struct { @@ -34,10 +33,12 @@ func (cache *baseTxCache) RegisterEvictionHandler(handler func(hash []byte)) err } // notifyEvictionHandlers will be called on a separate go routine -func (cache *baseTxCache) notifyEvictionHandlers(txHash []byte) { +func (cache *baseTxCache) notifyEvictionHandlers(txHashes [][]byte) { cache.mutEvictionHandlers.RLock() for _, handler := range cache.evictionHandlers { - handler(txHash) + for _, txHash := range txHashes { + handler(txHash) + } } cache.mutEvictionHandlers.RUnlock() } diff --git a/txcache/crossTxCache.go b/txcache/crossTxCache.go index 00de3f28..d0d19233 100644 --- a/txcache/crossTxCache.go +++ b/txcache/crossTxCache.go @@ -1,8 +1,7 @@ package txcache import ( - "context" - + "github.com/gammazero/workerpool" "github.com/multiversx/mx-chain-storage-go/immunitycache" "github.com/multiversx/mx-chain-storage-go/types" ) @@ -42,13 +41,11 @@ func NewCrossTxCache(config ConfigDestinationMe) (*CrossTxCache, error) { ImmunityCache: immunityCache, baseTxCache: &baseTxCache{ evictionHandlers: make([]func(txHash []byte), 0), - evictionWorkerPool: NewWorkerPool(numOfEvictionWorkers), + evictionWorkerPool: workerpool.New(maxNumOfEvictionWorkers), }, config: config, } - cache.evictionWorkerPool.StartWorkingEvictedHashes(context.Background(), cache.notifyEvictionHandlers) - return &cache, nil } @@ -104,7 +101,9 @@ func (cache *CrossTxCache) Peek(key []byte) (value interface{}, ok bool) { func (cache *CrossTxCache) RemoveTxByHash(txHash []byte) bool { ok := cache.RemoveWithResult(txHash) if ok { - cache.evictionWorkerPool.AddEvictedHashes([][]byte{txHash}) + cache.evictionWorkerPool.Submit(func() { + cache.notifyEvictionHandlers([][]byte{txHash}) + }) } return ok } @@ -128,6 +127,12 @@ func (cache *CrossTxCache) GetTransactionsPoolForSender(_ string) []*WrappedTran return make([]*WrappedTransaction, 0) } +// Close closes the eviction worker pool +func (cache *CrossTxCache) Close() error { + cache.evictionWorkerPool.Stop() + return nil +} + // IsInterfaceNil returns true if there is no value under the interface func (cache *CrossTxCache) IsInterfaceNil() bool { return cache == nil diff --git a/txcache/crossTxCache_test.go b/txcache/crossTxCache_test.go index b76db13d..4f6a2133 100644 --- a/txcache/crossTxCache_test.go +++ b/txcache/crossTxCache_test.go @@ -13,6 +13,7 @@ import ( func TestCrossTxCache_DoImmunizeTxsAgainstEviction(t *testing.T) { cache := newCrossTxCacheToTest(1, 8, math.MaxUint16) + defer func() { require.Nil(t, cache.Close()) }() cache.addTestTxs("a", "b", "c", "d") numNow, numFuture := cache.ImmunizeKeys(hashesAsBytes([]string{"a", "b", "e", "f"})) @@ -29,6 +30,7 @@ func TestCrossTxCache_DoImmunizeTxsAgainstEviction(t *testing.T) { func TestCrossTxCache_Get(t *testing.T) { cache := newCrossTxCacheToTest(1, 8, math.MaxUint16) + defer func() { require.Nil(t, cache.Close()) }() cache.addTestTxs("a", "b", "c", "d") a, ok := cache.GetByTxHash([]byte("a")) @@ -64,6 +66,7 @@ func TestCrossTxCache_RegisterEvictionHandler(t *testing.T) { t.Parallel() cache := newCrossTxCacheToTest(1, 8, math.MaxUint16) + defer func() { require.Nil(t, cache.Close()) }() cache.addTestTx("hash-1") diff --git a/txcache/eviction.go b/txcache/eviction.go index 3eaef345..5be10f8e 100644 --- a/txcache/eviction.go +++ b/txcache/eviction.go @@ -69,7 +69,9 @@ func (cache *TxCache) areThereTooManyTxs() bool { // This is called concurrently by two goroutines: the eviction one and the sweeping one func (cache *TxCache) doEvictItems(txsToEvict [][]byte, sendersToEvict []string) (countTxs uint32, countSenders uint32) { - cache.evictionWorkerPool.AddEvictedHashes(txsToEvict) + cache.evictionWorkerPool.Submit(func() { + cache.notifyEvictionHandlers(txsToEvict) + }) countTxs = cache.txByHash.RemoveTxsBulk(txsToEvict) countSenders = cache.txListBySender.RemoveSendersBulk(sendersToEvict) return diff --git a/txcache/eviction_test.go b/txcache/eviction_test.go index f5555ec0..5df37499 100644 --- a/txcache/eviction_test.go +++ b/txcache/eviction_test.go @@ -42,6 +42,7 @@ func TestEviction_EvictSendersWhileTooManyTxs(t *testing.T) { require.Equal(t, uint32(100), nSenders) require.Equal(t, int64(100), cache.txListBySender.counter.Get()) require.Equal(t, int64(100), cache.txByHash.counter.Get()) + require.Nil(t, cache.Close()) } func TestEviction_EvictSendersWhileTooManyBytes(t *testing.T) { @@ -79,6 +80,7 @@ func TestEviction_EvictSendersWhileTooManyBytes(t *testing.T) { require.Equal(t, uint32(100), nSenders) require.Equal(t, int64(100), cache.txListBySender.counter.Get()) require.Equal(t, int64(100), cache.txByHash.counter.Get()) + require.Nil(t, cache.Close()) } func TestEviction_DoEvictionDoneInPassTwo_BecauseOfCount(t *testing.T) { @@ -110,6 +112,7 @@ func TestEviction_DoEvictionDoneInPassTwo_BecauseOfCount(t *testing.T) { require.True(t, ok) require.Equal(t, uint64(1), cache.CountSenders()) require.Equal(t, uint64(1), cache.CountTx()) + require.Nil(t, cache.Close()) } func TestEviction_DoEvictionDoneInPassTwo_BecauseOfSize(t *testing.T) { @@ -164,6 +167,7 @@ func TestEviction_DoEvictionDoneInPassTwo_BecauseOfSize(t *testing.T) { require.True(t, ok) require.Equal(t, uint64(5), cache.CountSenders()) require.Equal(t, uint64(5), cache.CountTx()) + require.Nil(t, cache.Close()) } func TestEviction_doEvictionDoesNothingWhenAlreadyInProgress(t *testing.T) { @@ -187,6 +191,7 @@ func TestEviction_doEvictionDoesNothingWhenAlreadyInProgress(t *testing.T) { cache.doEviction() require.False(t, cache.evictionJournal.evictionPerformed) + require.Nil(t, cache.Close()) } func TestEviction_evictSendersInLoop_CoverLoopBreak_WhenSmallBatch(t *testing.T) { @@ -212,6 +217,7 @@ func TestEviction_evictSendersInLoop_CoverLoopBreak_WhenSmallBatch(t *testing.T) require.Equal(t, uint32(0), steps) require.Equal(t, uint32(1), nTxs) require.Equal(t, uint32(1), nSenders) + require.Nil(t, cache.Close()) } func TestEviction_evictSendersWhile_ShouldContinueBreak(t *testing.T) { @@ -241,6 +247,7 @@ func TestEviction_evictSendersWhile_ShouldContinueBreak(t *testing.T) { require.Equal(t, uint32(0), steps) require.Equal(t, uint32(0), nTxs) require.Equal(t, uint32(0), nSenders) + require.Nil(t, cache.Close()) } // This seems to be the most reasonable "bad-enough" (not worst) scenario to benchmark: @@ -271,6 +278,7 @@ func Test_AddWithEviction_UniformDistribution_25000x10(t *testing.T) { // Sometimes (due to map iteration non-determinism), more eviction happens - one more step of 100 senders. require.LessOrEqual(t, uint32(cache.CountTx()), config.CountThreshold) require.GreaterOrEqual(t, uint32(cache.CountTx()), config.CountThreshold-config.NumSendersToPreemptivelyEvict*uint32(numTxsPerSender)) + require.Nil(t, cache.Close()) } func Test_EvictSendersAndTheirTxs_Concurrently(t *testing.T) { @@ -304,4 +312,5 @@ func Test_EvictSendersAndTheirTxs_Concurrently(t *testing.T) { } wg.Wait() + require.Nil(t, cache.Close()) } diff --git a/txcache/sweeping_test.go b/txcache/sweeping_test.go index a700f7a8..a188e5b6 100644 --- a/txcache/sweeping_test.go +++ b/txcache/sweeping_test.go @@ -48,6 +48,7 @@ func TestSweeping_CollectSweepable(t *testing.T) { require.Equal(t, 3, cache.getNumFailedSelectionsOfSender("alice")) require.Equal(t, 3, cache.getNumFailedSelectionsOfSender("bob")) require.Equal(t, 0, cache.getNumFailedSelectionsOfSender("carol")) + require.Nil(t, cache.Close()) } func TestSweeping_WhenSendersEscapeCollection(t *testing.T) { @@ -93,6 +94,7 @@ func TestSweeping_WhenSendersEscapeCollection(t *testing.T) { require.Equal(t, 0, cache.getNumFailedSelectionsOfSender("alice")) require.Equal(t, 0, cache.getNumFailedSelectionsOfSender("bob")) require.Equal(t, 0, cache.getNumFailedSelectionsOfSender("carol")) + require.Nil(t, cache.Close()) } func TestSweeping_SweepSweepable(t *testing.T) { @@ -115,4 +117,5 @@ func TestSweeping_SweepSweepable(t *testing.T) { require.Equal(t, uint64(1), cache.CountTx()) require.Equal(t, uint64(1), cache.CountSenders()) + require.Nil(t, cache.Close()) } diff --git a/txcache/txCache.go b/txcache/txCache.go index a02cdeae..f139ae91 100644 --- a/txcache/txCache.go +++ b/txcache/txCache.go @@ -1,9 +1,9 @@ package txcache import ( - "context" "sync" + "github.com/gammazero/workerpool" "github.com/multiversx/mx-chain-core-go/core/atomic" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-storage-go/common" @@ -51,11 +51,10 @@ func NewTxCache(config ConfigSourceMe, txGasHandler TxGasHandler) (*TxCache, err senderConstraintsObj := config.getSenderConstraints() txFeeHelper := newFeeComputationHelper(txGasHandler.MinGasPrice(), txGasHandler.MinGasLimit(), txGasHandler.MinGasPriceForProcessing()) scoreComputerObj := newDefaultScoreComputer(txFeeHelper) - txCache := &TxCache{ baseTxCache: &baseTxCache{ evictionHandlers: make([]func(txHash []byte), 0), - evictionWorkerPool: NewWorkerPool(numOfEvictionWorkers), + evictionWorkerPool: workerpool.New(maxNumOfEvictionWorkers), }, name: config.Name, txListBySender: newTxListBySenderMap(numChunks, senderConstraintsObj, scoreComputerObj, txGasHandler, txFeeHelper), @@ -64,8 +63,6 @@ func NewTxCache(config ConfigSourceMe, txGasHandler TxGasHandler) (*TxCache, err evictionJournal: evictionJournal{}, } - txCache.evictionWorkerPool.StartWorkingEvictedHashes(context.Background(), txCache.notifyEvictionHandlers) - txCache.initSweepable() return txCache, nil } @@ -96,7 +93,9 @@ func (cache *TxCache) AddTx(tx *WrappedTransaction) (ok bool, added bool) { if len(evicted) > 0 { cache.monitorEvictionWrtSenderLimit(tx.Tx.GetSndAddr(), evicted) - cache.evictionWorkerPool.AddEvictedHashes(evicted) + cache.evictionWorkerPool.Submit(func() { + cache.notifyEvictionHandlers(evicted) + }) cache.txByHash.RemoveTxsBulk(evicted) } @@ -175,7 +174,9 @@ func (cache *TxCache) doAfterSelection() { // RemoveTxByHash removes tx by hash func (cache *TxCache) RemoveTxByHash(txHash []byte) bool { - cache.evictionWorkerPool.AddEvictedHashes([][]byte{txHash}) + cache.evictionWorkerPool.Submit(func() { + cache.notifyEvictionHandlers([][]byte{txHash}) + }) cache.mutTxOperation.Lock() defer cache.mutTxOperation.Unlock() @@ -309,28 +310,6 @@ func (cache *TxCache) MaxSize() int { return int(cache.config.CountThreshold) } -// RegisterEvictionHandler registers a handler which will be called when a tx is evicted from cache -func (cache *TxCache) RegisterEvictionHandler(handler func(hash []byte)) error { - if handler == nil { - return common.ErrNilEvictionHandler - } - - cache.mutEvictionHandlers.Lock() - cache.evictionHandlers = append(cache.evictionHandlers, handler) - cache.mutEvictionHandlers.Unlock() - - return nil -} - -// notifyEvictionHandlers will be called on a separate go routine -func (cache *TxCache) notifyEvictionHandlers(txHash []byte) { - cache.mutEvictionHandlers.RLock() - for _, handler := range cache.evictionHandlers { - handler(txHash) - } - cache.mutEvictionHandlers.RUnlock() -} - // RegisterHandler is not implemented func (cache *TxCache) RegisterHandler(func(key []byte, value interface{}), string) { log.Error("TxCache.RegisterHandler is not implemented") @@ -351,8 +330,9 @@ func (cache *TxCache) NotifyAccountNonce(accountKey []byte, nonce uint64) { func (cache *TxCache) ImmunizeTxsAgainstEviction(_ [][]byte) { } -// Close does nothing for this cacher implementation +// Close closes the eviction worker pool func (cache *TxCache) Close() error { + cache.evictionWorkerPool.Stop() return nil } diff --git a/txcache/txCacheEvictionWorkerPool.go b/txcache/txCacheEvictionWorkerPool.go deleted file mode 100644 index 8c0b32ca..00000000 --- a/txcache/txCacheEvictionWorkerPool.go +++ /dev/null @@ -1,48 +0,0 @@ -package txcache - -import ( - "context" -) - -type txCacheEvictionWorkerPool struct { - maxWorkers uint32 - evictedHashesQueue chan []byte -} - -// NewWorkerPool returns a new workerPool instance -func NewWorkerPool(maxWorkers uint32) *txCacheEvictionWorkerPool { - return &txCacheEvictionWorkerPool{ - maxWorkers: maxWorkers, - evictedHashesQueue: make(chan []byte), - } -} - -// StartWorkingEvictedHashes starts the workers go routines -func (wp *txCacheEvictionWorkerPool) StartWorkingEvictedHashes(ctx context.Context, handler func(hash []byte)) { - if handler == nil { - return - } - - for i := uint32(0); i < wp.maxWorkers; i++ { - go wp.startWorker(ctx, handler) - } -} - -func (wp *txCacheEvictionWorkerPool) startWorker(ctx context.Context, handler func(hash []byte)) { - for { - select { - case <-ctx.Done(): - log.Debug("closing evicted hashes worker...") - return - case evictedHash := <-wp.evictedHashesQueue: - handler(evictedHash) - } - } -} - -// AddEvictedHashes adds the evicted hashes to the queue -func (wp *txCacheEvictionWorkerPool) AddEvictedHashes(hashes [][]byte) { - for i := 0; i < len(hashes); i++ { - wp.evictedHashesQueue <- hashes[i] - } -} diff --git a/txcache/txCacheEvictionWorkerPool_test.go b/txcache/txCacheEvictionWorkerPool_test.go deleted file mode 100644 index e7e54152..00000000 --- a/txcache/txCacheEvictionWorkerPool_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package txcache - -import ( - "context" - "fmt" - "runtime" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestWorkerPool_WithGoroutinesCheck(t *testing.T) { - evictedHashes := make(map[string]int) - mutEvictedHashes := sync.RWMutex{} - - numWorkers := 5 - backgroundGoroutines := runtime.NumGoroutine() - wp := NewWorkerPool(uint32(numWorkers)) - ctx, cancel := context.WithCancel(context.Background()) - wp.StartWorkingEvictedHashes(ctx, func(hash []byte) { - mutEvictedHashes.Lock() - defer mutEvictedHashes.Unlock() - - evictedHashes[string(hash)]++ - }) - - // expected goroutines: 5 workers + background - expectedGoroutines := numWorkers + backgroundGoroutines - require.Equal(t, expectedGoroutines, runtime.NumGoroutine()) - - numHashes := 10000 - for i := 0; i < numHashes; i++ { - go func(idx int) { - time.Sleep(time.Millisecond * 100) - - hash := fmt.Sprintf("hash_%d", idx) - wp.AddEvictedHashes([][]byte{[]byte(hash)}) - }(i) - } - - // expected goroutines: 10000 AddEvictedHashes + 5 workers + background - expectedGoroutines = numHashes + numWorkers + backgroundGoroutines - require.Equal(t, expectedGoroutines, runtime.NumGoroutine()) - - // allow all hashes to be "evicted" - time.Sleep(time.Millisecond * 150) - - // workers still running with no pending evicted tx - // expected goroutines: 5 workers + background - expectedGoroutines = numWorkers + backgroundGoroutines - require.Equal(t, expectedGoroutines, runtime.NumGoroutine()) - - // close the workers - cancel() - - // allow all workers to close - time.Sleep(time.Millisecond * 5) - - // expected goroutines: background - expectedGoroutines = backgroundGoroutines - require.Equal(t, expectedGoroutines, runtime.NumGoroutine()) - - mutEvictedHashes.RLock() - defer mutEvictedHashes.RUnlock() - require.Equal(t, numHashes, len(evictedHashes)) - for _, cnt := range evictedHashes { - require.Equal(t, 1, cnt) - } -} diff --git a/txcache/txCache_test.go b/txcache/txCache_test.go index d806b387..5a043d4b 100644 --- a/txcache/txCache_test.go +++ b/txcache/txCache_test.go @@ -42,6 +42,7 @@ func Test_NewTxCache(t *testing.T) { cache, err := NewTxCache(config, txGasHandler) require.Nil(t, err) require.NotNil(t, cache) + require.Nil(t, cache.Close()) badConfig := config badConfig.Name = "" @@ -107,6 +108,7 @@ func Test_AddTx(t *testing.T) { func Test_AddNilTx_DoesNothing(t *testing.T) { cache := newUnconstrainedCacheToTest() + defer func() { require.Nil(t, cache.Close()) }() txHash := []byte("hash-1") @@ -135,6 +137,7 @@ func Test_AddTx_AppliesSizeConstraintsPerSenderForNumTransactions(t *testing.T) require.Equal(t, []string{"tx-alice-1", "tx-alice-2", "tx-alice-3"}, cache.getHashesForSender("alice")) require.Equal(t, []string{"tx-bob-1", "tx-bob-2"}, cache.getHashesForSender("bob")) require.True(t, cache.areInternalMapsConsistent()) + require.Nil(t, cache.Close()) } func Test_AddTx_AppliesSizeConstraintsPerSenderForNumBytes(t *testing.T) { @@ -155,6 +158,7 @@ func Test_AddTx_AppliesSizeConstraintsPerSenderForNumBytes(t *testing.T) { require.Equal(t, []string{"tx-alice-1", "tx-alice-2", "tx-alice-3"}, cache.getHashesForSender("alice")) require.Equal(t, []string{"tx-bob-1", "tx-bob-2"}, cache.getHashesForSender("bob")) require.True(t, cache.areInternalMapsConsistent()) + require.Nil(t, cache.Close()) } func Test_RemoveByTxHash(t *testing.T) { @@ -438,6 +442,7 @@ func Test_AddWithEviction_UniformDistributionOfTxsPerSender(t *testing.T) { NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound, CountPerSenderThreshold: math.MaxUint32, } + require.Nil(t, cache.Close()) // 100 * 1000 cache, err = NewTxCache(config, txGasHandler) @@ -446,6 +451,7 @@ func Test_AddWithEviction_UniformDistributionOfTxsPerSender(t *testing.T) { addManyTransactionsWithUniformDistribution(cache, 100, 1000) require.LessOrEqual(t, cache.CountTx(), uint64(250000)) + require.Nil(t, cache.Close()) } func Test_NotImplementedFunctions(t *testing.T) { From b4421cd171e2ba52e80bdc661d2c69716cf98985 Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu Date: Mon, 18 Sep 2023 09:57:38 +0300 Subject: [PATCH 6/8] fix after review --- txcache/baseTxCache.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/txcache/baseTxCache.go b/txcache/baseTxCache.go index 91f441c2..67321686 100644 --- a/txcache/baseTxCache.go +++ b/txcache/baseTxCache.go @@ -35,10 +35,12 @@ func (cache *baseTxCache) RegisterEvictionHandler(handler func(hash []byte)) err // notifyEvictionHandlers will be called on a separate go routine func (cache *baseTxCache) notifyEvictionHandlers(txHashes [][]byte) { cache.mutEvictionHandlers.RLock() - for _, handler := range cache.evictionHandlers { + handlers := cache.evictionHandlers + cache.mutEvictionHandlers.RUnlock() + + for _, handler := range handlers { for _, txHash := range txHashes { handler(txHash) } } - cache.mutEvictionHandlers.RUnlock() } From ac3c529bd596fdfa624be82036713f5e19e9a30c Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu Date: Mon, 18 Sep 2023 11:21:22 +0300 Subject: [PATCH 7/8] further fixes after review --- testscommon/evictionNotifierStub.go | 18 ++++++++++++++++++ txcache/baseTxCache.go | 13 ++++++++----- txcache/crossTxCache.go | 2 +- txcache/crossTxCache_test.go | 9 ++++++--- txcache/txCache.go | 2 +- txcache/txCache_test.go | 19 ++++++++++++------- types/interface.go | 8 +++++++- 7 files changed, 53 insertions(+), 18 deletions(-) create mode 100644 testscommon/evictionNotifierStub.go diff --git a/testscommon/evictionNotifierStub.go b/testscommon/evictionNotifierStub.go new file mode 100644 index 00000000..1a9c47fa --- /dev/null +++ b/testscommon/evictionNotifierStub.go @@ -0,0 +1,18 @@ +package testscommon + +// EvictionNotifierStub - +type EvictionNotifierStub struct { + NotifyEvictionCalled func(txHash []byte) +} + +// NotifyEviction - +func (stub *EvictionNotifierStub) NotifyEviction(txHash []byte) { + if stub.NotifyEvictionCalled != nil { + stub.NotifyEvictionCalled(txHash) + } +} + +// IsInterfaceNil - +func (stub *EvictionNotifierStub) IsInterfaceNil() bool { + return stub == nil +} diff --git a/txcache/baseTxCache.go b/txcache/baseTxCache.go index 67321686..e835770c 100644 --- a/txcache/baseTxCache.go +++ b/txcache/baseTxCache.go @@ -3,7 +3,9 @@ package txcache import ( "sync" + "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-storage-go/common" + "github.com/multiversx/mx-chain-storage-go/types" ) const maxNumOfEvictionWorkers = 5 @@ -15,13 +17,13 @@ type evictionWorkerPool interface { type baseTxCache struct { mutEvictionHandlers sync.RWMutex - evictionHandlers []func(txHash []byte) + evictionHandlers []types.EvictionNotifier evictionWorkerPool evictionWorkerPool } // RegisterEvictionHandler registers a handler which will be called when a tx is evicted from cache -func (cache *baseTxCache) RegisterEvictionHandler(handler func(hash []byte)) error { - if handler == nil { +func (cache *baseTxCache) RegisterEvictionHandler(handler types.EvictionNotifier) error { + if check.IfNil(handler) { return common.ErrNilEvictionHandler } @@ -35,12 +37,13 @@ func (cache *baseTxCache) RegisterEvictionHandler(handler func(hash []byte)) err // notifyEvictionHandlers will be called on a separate go routine func (cache *baseTxCache) notifyEvictionHandlers(txHashes [][]byte) { cache.mutEvictionHandlers.RLock() - handlers := cache.evictionHandlers + handlers := make([]types.EvictionNotifier, len(cache.evictionHandlers)) + copy(handlers, cache.evictionHandlers) cache.mutEvictionHandlers.RUnlock() for _, handler := range handlers { for _, txHash := range txHashes { - handler(txHash) + handler.NotifyEviction(txHash) } } } diff --git a/txcache/crossTxCache.go b/txcache/crossTxCache.go index d0d19233..22ac51e7 100644 --- a/txcache/crossTxCache.go +++ b/txcache/crossTxCache.go @@ -40,7 +40,7 @@ func NewCrossTxCache(config ConfigDestinationMe) (*CrossTxCache, error) { cache := CrossTxCache{ ImmunityCache: immunityCache, baseTxCache: &baseTxCache{ - evictionHandlers: make([]func(txHash []byte), 0), + evictionHandlers: make([]types.EvictionNotifier, 0), evictionWorkerPool: workerpool.New(maxNumOfEvictionWorkers), }, config: config, diff --git a/txcache/crossTxCache_test.go b/txcache/crossTxCache_test.go index 4f6a2133..61aa9423 100644 --- a/txcache/crossTxCache_test.go +++ b/txcache/crossTxCache_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/multiversx/mx-chain-storage-go/common" + "github.com/multiversx/mx-chain-storage-go/testscommon" "github.com/stretchr/testify/require" ) @@ -74,9 +75,11 @@ func TestCrossTxCache_RegisterEvictionHandler(t *testing.T) { require.Equal(t, common.ErrNilEvictionHandler, err) ch := make(chan struct{}) - err = cache.RegisterEvictionHandler(func(hash []byte) { - require.True(t, bytes.Equal([]byte("hash-1"), hash)) - ch <- struct{}{} + err = cache.RegisterEvictionHandler(&testscommon.EvictionNotifierStub{ + NotifyEvictionCalled: func(hash []byte) { + require.True(t, bytes.Equal([]byte("hash-1"), hash)) + ch <- struct{}{} + }, }) require.NoError(t, err) diff --git a/txcache/txCache.go b/txcache/txCache.go index f139ae91..18d1ccfc 100644 --- a/txcache/txCache.go +++ b/txcache/txCache.go @@ -53,7 +53,7 @@ func NewTxCache(config ConfigSourceMe, txGasHandler TxGasHandler) (*TxCache, err scoreComputerObj := newDefaultScoreComputer(txFeeHelper) txCache := &TxCache{ baseTxCache: &baseTxCache{ - evictionHandlers: make([]func(txHash []byte), 0), + evictionHandlers: make([]types.EvictionNotifier, 0), evictionWorkerPool: workerpool.New(maxNumOfEvictionWorkers), }, name: config.Name, diff --git a/txcache/txCache_test.go b/txcache/txCache_test.go index 5a043d4b..4af75c83 100644 --- a/txcache/txCache_test.go +++ b/txcache/txCache_test.go @@ -14,6 +14,7 @@ import ( "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-storage-go/common" + "github.com/multiversx/mx-chain-storage-go/testscommon" "github.com/multiversx/mx-chain-storage-go/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -553,9 +554,11 @@ func TestTxCache_NoCriticalInconsistency_WhenConcurrentAdditionsAndRemovals(t *t handlerCalls := uint32(0) evictionHandlerWG := sync.WaitGroup{} - _ = cache.RegisterEvictionHandler(func(hash []byte) { - atomic.AddUint32(&handlerCalls, 1) - evictionHandlerWG.Done() + _ = cache.RegisterEvictionHandler(&testscommon.EvictionNotifierStub{ + NotifyEvictionCalled: func(hash []byte) { + atomic.AddUint32(&handlerCalls, 1) + evictionHandlerWG.Done() + }, }) // A lot of routines concur to add & remove THE FIRST transaction of a sender @@ -659,10 +662,12 @@ func TestTxCache_RegisterEvictionHandler(t *testing.T) { ch := make(chan uint32) cnt := uint32(0) - err = cache.RegisterEvictionHandler(func(hash []byte) { - atomic.AddUint32(&cnt, 1) - require.True(t, bytes.Equal([]byte("hash-1"), hash) || bytes.Equal([]byte("hash-2"), hash)) - ch <- atomic.LoadUint32(&cnt) + err = cache.RegisterEvictionHandler(&testscommon.EvictionNotifierStub{ + NotifyEvictionCalled: func(hash []byte) { + atomic.AddUint32(&cnt, 1) + require.True(t, bytes.Equal([]byte("hash-1"), hash) || bytes.Equal([]byte("hash-2"), hash)) + ch <- atomic.LoadUint32(&cnt) + }, }) require.NoError(t, err) diff --git a/types/interface.go b/types/interface.go index 99dca5f4..4a483f6a 100644 --- a/types/interface.go +++ b/types/interface.go @@ -225,8 +225,14 @@ type ShardIDProvider interface { IsInterfaceNil() bool } -// PersisterCreator defines the behavour of a component which is able to create a persister +// PersisterCreator defines the behaviour of a component which is able to create a persister type PersisterCreator interface { CreateBasePersister(path string) (Persister, error) IsInterfaceNil() bool } + +// EvictionNotifier defines the behaviour of a component which is able to handle an evicted transaction +type EvictionNotifier interface { + NotifyEviction(txHash []byte) + IsInterfaceNil() bool +} From 8bc7cfe4c3ea4b0c67979872fb8264bb4d8ab0b9 Mon Sep 17 00:00:00 2001 From: Sorin Stanculeanu Date: Tue, 19 Sep 2023 12:17:47 +0300 Subject: [PATCH 8/8] moved the worker pool on NotifyEviction calls instead of notifications --- txcache/baseTxCache.go | 8 +++++--- txcache/crossTxCache.go | 4 +--- txcache/eviction.go | 5 ++--- txcache/txCache.go | 8 ++------ 4 files changed, 10 insertions(+), 15 deletions(-) diff --git a/txcache/baseTxCache.go b/txcache/baseTxCache.go index e835770c..bdcbf0e9 100644 --- a/txcache/baseTxCache.go +++ b/txcache/baseTxCache.go @@ -34,8 +34,8 @@ func (cache *baseTxCache) RegisterEvictionHandler(handler types.EvictionNotifier return nil } -// notifyEvictionHandlers will be called on a separate go routine -func (cache *baseTxCache) notifyEvictionHandlers(txHashes [][]byte) { +// enqueueEvictedHashesForNotification will enqueue the provided hashes on the workers pool +func (cache *baseTxCache) enqueueEvictedHashesForNotification(txHashes [][]byte) { cache.mutEvictionHandlers.RLock() handlers := make([]types.EvictionNotifier, len(cache.evictionHandlers)) copy(handlers, cache.evictionHandlers) @@ -43,7 +43,9 @@ func (cache *baseTxCache) notifyEvictionHandlers(txHashes [][]byte) { for _, handler := range handlers { for _, txHash := range txHashes { - handler.NotifyEviction(txHash) + cache.evictionWorkerPool.Submit(func() { + handler.NotifyEviction(txHash) + }) } } } diff --git a/txcache/crossTxCache.go b/txcache/crossTxCache.go index 22ac51e7..7baf4e8e 100644 --- a/txcache/crossTxCache.go +++ b/txcache/crossTxCache.go @@ -101,9 +101,7 @@ func (cache *CrossTxCache) Peek(key []byte) (value interface{}, ok bool) { func (cache *CrossTxCache) RemoveTxByHash(txHash []byte) bool { ok := cache.RemoveWithResult(txHash) if ok { - cache.evictionWorkerPool.Submit(func() { - cache.notifyEvictionHandlers([][]byte{txHash}) - }) + cache.enqueueEvictedHashesForNotification([][]byte{txHash}) } return ok } diff --git a/txcache/eviction.go b/txcache/eviction.go index 5be10f8e..d15b156d 100644 --- a/txcache/eviction.go +++ b/txcache/eviction.go @@ -69,9 +69,8 @@ func (cache *TxCache) areThereTooManyTxs() bool { // This is called concurrently by two goroutines: the eviction one and the sweeping one func (cache *TxCache) doEvictItems(txsToEvict [][]byte, sendersToEvict []string) (countTxs uint32, countSenders uint32) { - cache.evictionWorkerPool.Submit(func() { - cache.notifyEvictionHandlers(txsToEvict) - }) + cache.enqueueEvictedHashesForNotification(txsToEvict) + countTxs = cache.txByHash.RemoveTxsBulk(txsToEvict) countSenders = cache.txListBySender.RemoveSendersBulk(sendersToEvict) return diff --git a/txcache/txCache.go b/txcache/txCache.go index 18d1ccfc..6378297f 100644 --- a/txcache/txCache.go +++ b/txcache/txCache.go @@ -93,9 +93,7 @@ func (cache *TxCache) AddTx(tx *WrappedTransaction) (ok bool, added bool) { if len(evicted) > 0 { cache.monitorEvictionWrtSenderLimit(tx.Tx.GetSndAddr(), evicted) - cache.evictionWorkerPool.Submit(func() { - cache.notifyEvictionHandlers(evicted) - }) + cache.enqueueEvictedHashesForNotification(evicted) cache.txByHash.RemoveTxsBulk(evicted) } @@ -174,9 +172,7 @@ func (cache *TxCache) doAfterSelection() { // RemoveTxByHash removes tx by hash func (cache *TxCache) RemoveTxByHash(txHash []byte) bool { - cache.evictionWorkerPool.Submit(func() { - cache.notifyEvictionHandlers([][]byte{txHash}) - }) + cache.enqueueEvictedHashesForNotification([][]byte{txHash}) cache.mutTxOperation.Lock() defer cache.mutTxOperation.Unlock()