diff --git a/common/errors.go b/common/errors.go index 6342eb4f..93e11c80 100644 --- a/common/errors.go +++ b/common/errors.go @@ -2,6 +2,7 @@ package common import ( "errors" + "github.com/multiversx/mx-chain-core-go/core" ) @@ -85,3 +86,6 @@ var ErrInvalidCacheExpiry = errors.New("invalid cache expiry") // ErrDBIsClosed is raised when the DB is closed var ErrDBIsClosed = core.ErrDBIsClosed + +// ErrNilEvictionHandler signals that a nil eviction handler has been provided +var ErrNilEvictionHandler = errors.New("nil eviction handler") diff --git a/go.mod b/go.mod index cfbd8445..5e06e2eb 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/multiversx/mx-chain-storage-go go 1.20 require ( + github.com/gammazero/workerpool v1.1.3 github.com/hashicorp/golang-lru v0.6.0 github.com/multiversx/concurrent-map v0.1.4 github.com/multiversx/mx-chain-core-go v1.2.16 @@ -14,6 +15,7 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/denisbrodbeck/machineid v1.0.1 // indirect + github.com/gammazero/deque v0.2.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect diff --git a/go.sum b/go.sum index d93b610d..051b3a64 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,10 @@ github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4 github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= +github.com/gammazero/deque v0.2.0 h1:SkieyNB4bg2/uZZLxvya0Pq6diUlwx7m2TeT7GAIWaA= +github.com/gammazero/deque v0.2.0/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= +github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q= +github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -71,6 +75,7 @@ github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d h1:vfofYNRScrDd github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/testscommon/evictionNotifierStub.go b/testscommon/evictionNotifierStub.go new file mode 100644 index 00000000..1a9c47fa --- /dev/null +++ b/testscommon/evictionNotifierStub.go @@ -0,0 +1,18 @@ +package testscommon + +// EvictionNotifierStub - +type EvictionNotifierStub struct { + NotifyEvictionCalled func(txHash []byte) +} + +// NotifyEviction - +func (stub *EvictionNotifierStub) NotifyEviction(txHash []byte) { + if stub.NotifyEvictionCalled != nil { + stub.NotifyEvictionCalled(txHash) + } +} + +// IsInterfaceNil - +func (stub *EvictionNotifierStub) IsInterfaceNil() bool { + return stub == nil +} diff --git a/txcache/baseTxCache.go b/txcache/baseTxCache.go new file mode 100644 index 00000000..bdcbf0e9 --- /dev/null +++ b/txcache/baseTxCache.go @@ -0,0 +1,51 @@ +package txcache + +import ( + "sync" + + "github.com/multiversx/mx-chain-core-go/core/check" + "github.com/multiversx/mx-chain-storage-go/common" + "github.com/multiversx/mx-chain-storage-go/types" +) + +const maxNumOfEvictionWorkers = 5 + +type evictionWorkerPool interface { + Stop() + Submit(task func()) +} + +type baseTxCache struct { + mutEvictionHandlers sync.RWMutex + evictionHandlers []types.EvictionNotifier + evictionWorkerPool evictionWorkerPool +} + +// RegisterEvictionHandler registers a handler which will be called when a tx is evicted from cache +func (cache *baseTxCache) RegisterEvictionHandler(handler types.EvictionNotifier) error { + if check.IfNil(handler) { + return common.ErrNilEvictionHandler + } + + cache.mutEvictionHandlers.Lock() + cache.evictionHandlers = append(cache.evictionHandlers, handler) + cache.mutEvictionHandlers.Unlock() + + return nil +} + +// enqueueEvictedHashesForNotification will enqueue the provided hashes on the workers pool +func (cache *baseTxCache) enqueueEvictedHashesForNotification(txHashes [][]byte) { + cache.mutEvictionHandlers.RLock() + handlers := make([]types.EvictionNotifier, len(cache.evictionHandlers)) + copy(handlers, cache.evictionHandlers) + cache.mutEvictionHandlers.RUnlock() + + for _, handler := range handlers { + for _, txHash := range txHashes { + cache.evictionWorkerPool.Submit(func() { + handler.NotifyEviction(txHash) + }) + } + } +} diff --git a/txcache/crossTxCache.go b/txcache/crossTxCache.go index 0c21e0d9..7baf4e8e 100644 --- a/txcache/crossTxCache.go +++ b/txcache/crossTxCache.go @@ -1,6 +1,7 @@ package txcache import ( + "github.com/gammazero/workerpool" "github.com/multiversx/mx-chain-storage-go/immunitycache" "github.com/multiversx/mx-chain-storage-go/types" ) @@ -10,6 +11,7 @@ var _ types.Cacher = (*CrossTxCache)(nil) // CrossTxCache holds cross-shard transactions (where destination == me) type CrossTxCache struct { *immunitycache.ImmunityCache + *baseTxCache config ConfigDestinationMe } @@ -37,7 +39,11 @@ func NewCrossTxCache(config ConfigDestinationMe) (*CrossTxCache, error) { cache := CrossTxCache{ ImmunityCache: immunityCache, - config: config, + baseTxCache: &baseTxCache{ + evictionHandlers: make([]types.EvictionNotifier, 0), + evictionWorkerPool: workerpool.New(maxNumOfEvictionWorkers), + }, + config: config, } return &cache, nil @@ -93,7 +99,11 @@ func (cache *CrossTxCache) Peek(key []byte) (value interface{}, ok bool) { // RemoveTxByHash removes tx by hash func (cache *CrossTxCache) RemoveTxByHash(txHash []byte) bool { - return cache.RemoveWithResult(txHash) + ok := cache.RemoveWithResult(txHash) + if ok { + cache.enqueueEvictedHashesForNotification([][]byte{txHash}) + } + return ok } // ForEachTransaction iterates over the transactions in the cache @@ -115,6 +125,12 @@ func (cache *CrossTxCache) GetTransactionsPoolForSender(_ string) []*WrappedTran return make([]*WrappedTransaction, 0) } +// Close closes the eviction worker pool +func (cache *CrossTxCache) Close() error { + cache.evictionWorkerPool.Stop() + return nil +} + // IsInterfaceNil returns true if there is no value under the interface func (cache *CrossTxCache) IsInterfaceNil() bool { return cache == nil diff --git a/txcache/crossTxCache_test.go b/txcache/crossTxCache_test.go index eca4a64b..61aa9423 100644 --- a/txcache/crossTxCache_test.go +++ b/txcache/crossTxCache_test.go @@ -1,15 +1,20 @@ package txcache import ( + "bytes" "fmt" "math" "testing" + "time" + "github.com/multiversx/mx-chain-storage-go/common" + "github.com/multiversx/mx-chain-storage-go/testscommon" "github.com/stretchr/testify/require" ) func TestCrossTxCache_DoImmunizeTxsAgainstEviction(t *testing.T) { cache := newCrossTxCacheToTest(1, 8, math.MaxUint16) + defer func() { require.Nil(t, cache.Close()) }() cache.addTestTxs("a", "b", "c", "d") numNow, numFuture := cache.ImmunizeKeys(hashesAsBytes([]string{"a", "b", "e", "f"})) @@ -26,6 +31,7 @@ func TestCrossTxCache_DoImmunizeTxsAgainstEviction(t *testing.T) { func TestCrossTxCache_Get(t *testing.T) { cache := newCrossTxCacheToTest(1, 8, math.MaxUint16) + defer func() { require.Nil(t, cache.Close()) }() cache.addTestTxs("a", "b", "c", "d") a, ok := cache.GetByTxHash([]byte("a")) @@ -57,6 +63,39 @@ func TestCrossTxCache_Get(t *testing.T) { require.Equal(t, make([]*WrappedTransaction, 0), cache.GetTransactionsPoolForSender("")) } +func TestCrossTxCache_RegisterEvictionHandler(t *testing.T) { + t.Parallel() + + cache := newCrossTxCacheToTest(1, 8, math.MaxUint16) + defer func() { require.Nil(t, cache.Close()) }() + + cache.addTestTx("hash-1") + + err := cache.RegisterEvictionHandler(nil) + require.Equal(t, common.ErrNilEvictionHandler, err) + + ch := make(chan struct{}) + err = cache.RegisterEvictionHandler(&testscommon.EvictionNotifierStub{ + NotifyEvictionCalled: func(hash []byte) { + require.True(t, bytes.Equal([]byte("hash-1"), hash)) + ch <- struct{}{} + }, + }) + require.NoError(t, err) + + removed := cache.RemoveTxByHash([]byte("hash-1")) + require.True(t, removed) + select { + case <-ch: + case <-time.After(time.Second): + require.Fail(t, "timeout") + } + + foundTx, ok := cache.GetByTxHash([]byte("hash-1")) + require.False(t, ok) + require.Nil(t, foundTx) +} + func newCrossTxCacheToTest(numChunks uint32, maxNumItems uint32, numMaxBytes uint32) *CrossTxCache { cache, err := NewCrossTxCache(ConfigDestinationMe{ Name: "test", diff --git a/txcache/eviction.go b/txcache/eviction.go index 985a1986..d15b156d 100644 --- a/txcache/eviction.go +++ b/txcache/eviction.go @@ -69,6 +69,8 @@ func (cache *TxCache) areThereTooManyTxs() bool { // This is called concurrently by two goroutines: the eviction one and the sweeping one func (cache *TxCache) doEvictItems(txsToEvict [][]byte, sendersToEvict []string) (countTxs uint32, countSenders uint32) { + cache.enqueueEvictedHashesForNotification(txsToEvict) + countTxs = cache.txByHash.RemoveTxsBulk(txsToEvict) countSenders = cache.txListBySender.RemoveSendersBulk(sendersToEvict) return diff --git a/txcache/eviction_test.go b/txcache/eviction_test.go index f5555ec0..5df37499 100644 --- a/txcache/eviction_test.go +++ b/txcache/eviction_test.go @@ -42,6 +42,7 @@ func TestEviction_EvictSendersWhileTooManyTxs(t *testing.T) { require.Equal(t, uint32(100), nSenders) require.Equal(t, int64(100), cache.txListBySender.counter.Get()) require.Equal(t, int64(100), cache.txByHash.counter.Get()) + require.Nil(t, cache.Close()) } func TestEviction_EvictSendersWhileTooManyBytes(t *testing.T) { @@ -79,6 +80,7 @@ func TestEviction_EvictSendersWhileTooManyBytes(t *testing.T) { require.Equal(t, uint32(100), nSenders) require.Equal(t, int64(100), cache.txListBySender.counter.Get()) require.Equal(t, int64(100), cache.txByHash.counter.Get()) + require.Nil(t, cache.Close()) } func TestEviction_DoEvictionDoneInPassTwo_BecauseOfCount(t *testing.T) { @@ -110,6 +112,7 @@ func TestEviction_DoEvictionDoneInPassTwo_BecauseOfCount(t *testing.T) { require.True(t, ok) require.Equal(t, uint64(1), cache.CountSenders()) require.Equal(t, uint64(1), cache.CountTx()) + require.Nil(t, cache.Close()) } func TestEviction_DoEvictionDoneInPassTwo_BecauseOfSize(t *testing.T) { @@ -164,6 +167,7 @@ func TestEviction_DoEvictionDoneInPassTwo_BecauseOfSize(t *testing.T) { require.True(t, ok) require.Equal(t, uint64(5), cache.CountSenders()) require.Equal(t, uint64(5), cache.CountTx()) + require.Nil(t, cache.Close()) } func TestEviction_doEvictionDoesNothingWhenAlreadyInProgress(t *testing.T) { @@ -187,6 +191,7 @@ func TestEviction_doEvictionDoesNothingWhenAlreadyInProgress(t *testing.T) { cache.doEviction() require.False(t, cache.evictionJournal.evictionPerformed) + require.Nil(t, cache.Close()) } func TestEviction_evictSendersInLoop_CoverLoopBreak_WhenSmallBatch(t *testing.T) { @@ -212,6 +217,7 @@ func TestEviction_evictSendersInLoop_CoverLoopBreak_WhenSmallBatch(t *testing.T) require.Equal(t, uint32(0), steps) require.Equal(t, uint32(1), nTxs) require.Equal(t, uint32(1), nSenders) + require.Nil(t, cache.Close()) } func TestEviction_evictSendersWhile_ShouldContinueBreak(t *testing.T) { @@ -241,6 +247,7 @@ func TestEviction_evictSendersWhile_ShouldContinueBreak(t *testing.T) { require.Equal(t, uint32(0), steps) require.Equal(t, uint32(0), nTxs) require.Equal(t, uint32(0), nSenders) + require.Nil(t, cache.Close()) } // This seems to be the most reasonable "bad-enough" (not worst) scenario to benchmark: @@ -271,6 +278,7 @@ func Test_AddWithEviction_UniformDistribution_25000x10(t *testing.T) { // Sometimes (due to map iteration non-determinism), more eviction happens - one more step of 100 senders. require.LessOrEqual(t, uint32(cache.CountTx()), config.CountThreshold) require.GreaterOrEqual(t, uint32(cache.CountTx()), config.CountThreshold-config.NumSendersToPreemptivelyEvict*uint32(numTxsPerSender)) + require.Nil(t, cache.Close()) } func Test_EvictSendersAndTheirTxs_Concurrently(t *testing.T) { @@ -304,4 +312,5 @@ func Test_EvictSendersAndTheirTxs_Concurrently(t *testing.T) { } wg.Wait() + require.Nil(t, cache.Close()) } diff --git a/txcache/sweeping_test.go b/txcache/sweeping_test.go index a700f7a8..a188e5b6 100644 --- a/txcache/sweeping_test.go +++ b/txcache/sweeping_test.go @@ -48,6 +48,7 @@ func TestSweeping_CollectSweepable(t *testing.T) { require.Equal(t, 3, cache.getNumFailedSelectionsOfSender("alice")) require.Equal(t, 3, cache.getNumFailedSelectionsOfSender("bob")) require.Equal(t, 0, cache.getNumFailedSelectionsOfSender("carol")) + require.Nil(t, cache.Close()) } func TestSweeping_WhenSendersEscapeCollection(t *testing.T) { @@ -93,6 +94,7 @@ func TestSweeping_WhenSendersEscapeCollection(t *testing.T) { require.Equal(t, 0, cache.getNumFailedSelectionsOfSender("alice")) require.Equal(t, 0, cache.getNumFailedSelectionsOfSender("bob")) require.Equal(t, 0, cache.getNumFailedSelectionsOfSender("carol")) + require.Nil(t, cache.Close()) } func TestSweeping_SweepSweepable(t *testing.T) { @@ -115,4 +117,5 @@ func TestSweeping_SweepSweepable(t *testing.T) { require.Equal(t, uint64(1), cache.CountTx()) require.Equal(t, uint64(1), cache.CountSenders()) + require.Nil(t, cache.Close()) } diff --git a/txcache/txCache.go b/txcache/txCache.go index d938b976..6378297f 100644 --- a/txcache/txCache.go +++ b/txcache/txCache.go @@ -3,6 +3,7 @@ package txcache import ( "sync" + "github.com/gammazero/workerpool" "github.com/multiversx/mx-chain-core-go/core/atomic" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-storage-go/common" @@ -14,6 +15,7 @@ var _ types.Cacher = (*TxCache)(nil) // TxCache represents a cache-like structure (it has a fixed capacity and implements an eviction mechanism) for holding transactions type TxCache struct { + *baseTxCache name string txListBySender *txListBySenderMap txByHash *txByHashMap @@ -49,8 +51,11 @@ func NewTxCache(config ConfigSourceMe, txGasHandler TxGasHandler) (*TxCache, err senderConstraintsObj := config.getSenderConstraints() txFeeHelper := newFeeComputationHelper(txGasHandler.MinGasPrice(), txGasHandler.MinGasLimit(), txGasHandler.MinGasPriceForProcessing()) scoreComputerObj := newDefaultScoreComputer(txFeeHelper) - txCache := &TxCache{ + baseTxCache: &baseTxCache{ + evictionHandlers: make([]types.EvictionNotifier, 0), + evictionWorkerPool: workerpool.New(maxNumOfEvictionWorkers), + }, name: config.Name, txListBySender: newTxListBySenderMap(numChunks, senderConstraintsObj, scoreComputerObj, txGasHandler, txFeeHelper), txByHash: newTxByHashMap(numChunks), @@ -88,6 +93,7 @@ func (cache *TxCache) AddTx(tx *WrappedTransaction) (ok bool, added bool) { if len(evicted) > 0 { cache.monitorEvictionWrtSenderLimit(tx.Tx.GetSndAddr(), evicted) + cache.enqueueEvictedHashesForNotification(evicted) cache.txByHash.RemoveTxsBulk(evicted) } @@ -166,6 +172,8 @@ func (cache *TxCache) doAfterSelection() { // RemoveTxByHash removes tx by hash func (cache *TxCache) RemoveTxByHash(txHash []byte) bool { + cache.enqueueEvictedHashesForNotification([][]byte{txHash}) + cache.mutTxOperation.Lock() defer cache.mutTxOperation.Unlock() @@ -318,8 +326,9 @@ func (cache *TxCache) NotifyAccountNonce(accountKey []byte, nonce uint64) { func (cache *TxCache) ImmunizeTxsAgainstEviction(_ [][]byte) { } -// Close does nothing for this cacher implementation +// Close closes the eviction worker pool func (cache *TxCache) Close() error { + cache.evictionWorkerPool.Stop() return nil } diff --git a/txcache/txCache_test.go b/txcache/txCache_test.go index 3a8b41c4..4af75c83 100644 --- a/txcache/txCache_test.go +++ b/txcache/txCache_test.go @@ -1,17 +1,20 @@ package txcache import ( + "bytes" "errors" "fmt" "math" "sort" "sync" + "sync/atomic" "testing" "time" "github.com/multiversx/mx-chain-core-go/core" "github.com/multiversx/mx-chain-core-go/core/check" "github.com/multiversx/mx-chain-storage-go/common" + "github.com/multiversx/mx-chain-storage-go/testscommon" "github.com/multiversx/mx-chain-storage-go/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -40,6 +43,7 @@ func Test_NewTxCache(t *testing.T) { cache, err := NewTxCache(config, txGasHandler) require.Nil(t, err) require.NotNil(t, cache) + require.Nil(t, cache.Close()) badConfig := config badConfig.Name = "" @@ -105,6 +109,7 @@ func Test_AddTx(t *testing.T) { func Test_AddNilTx_DoesNothing(t *testing.T) { cache := newUnconstrainedCacheToTest() + defer func() { require.Nil(t, cache.Close()) }() txHash := []byte("hash-1") @@ -133,6 +138,7 @@ func Test_AddTx_AppliesSizeConstraintsPerSenderForNumTransactions(t *testing.T) require.Equal(t, []string{"tx-alice-1", "tx-alice-2", "tx-alice-3"}, cache.getHashesForSender("alice")) require.Equal(t, []string{"tx-bob-1", "tx-bob-2"}, cache.getHashesForSender("bob")) require.True(t, cache.areInternalMapsConsistent()) + require.Nil(t, cache.Close()) } func Test_AddTx_AppliesSizeConstraintsPerSenderForNumBytes(t *testing.T) { @@ -153,6 +159,7 @@ func Test_AddTx_AppliesSizeConstraintsPerSenderForNumBytes(t *testing.T) { require.Equal(t, []string{"tx-alice-1", "tx-alice-2", "tx-alice-3"}, cache.getHashesForSender("alice")) require.Equal(t, []string{"tx-bob-1", "tx-bob-2"}, cache.getHashesForSender("bob")) require.True(t, cache.areInternalMapsConsistent()) + require.Nil(t, cache.Close()) } func Test_RemoveByTxHash(t *testing.T) { @@ -436,6 +443,7 @@ func Test_AddWithEviction_UniformDistributionOfTxsPerSender(t *testing.T) { NumBytesPerSenderThreshold: maxNumBytesPerSenderUpperBound, CountPerSenderThreshold: math.MaxUint32, } + require.Nil(t, cache.Close()) // 100 * 1000 cache, err = NewTxCache(config, txGasHandler) @@ -444,6 +452,7 @@ func Test_AddWithEviction_UniformDistributionOfTxsPerSender(t *testing.T) { addManyTransactionsWithUniformDistribution(cache, 100, 1000) require.LessOrEqual(t, cache.CountTx(), uint64(250000)) + require.Nil(t, cache.Close()) } func Test_NotImplementedFunctions(t *testing.T) { @@ -543,12 +552,22 @@ func TestTxCache_TransactionIsAdded_EvenWhenInternalMapsAreInconsistent(t *testi func TestTxCache_NoCriticalInconsistency_WhenConcurrentAdditionsAndRemovals(t *testing.T) { cache := newUnconstrainedCacheToTest() + handlerCalls := uint32(0) + evictionHandlerWG := sync.WaitGroup{} + _ = cache.RegisterEvictionHandler(&testscommon.EvictionNotifierStub{ + NotifyEvictionCalled: func(hash []byte) { + atomic.AddUint32(&handlerCalls, 1) + evictionHandlerWG.Done() + }, + }) + // A lot of routines concur to add & remove THE FIRST transaction of a sender for try := 0; try < 100; try++ { var wg sync.WaitGroup for i := 0; i < 50; i++ { wg.Add(1) + evictionHandlerWG.Add(1) go func() { cache.AddTx(createTx([]byte("alice-x"), "alice", 42)) _ = cache.RemoveTxByHash([]byte("alice-x")) @@ -590,6 +609,7 @@ func TestTxCache_NoCriticalInconsistency_WhenConcurrentAdditionsAndRemovals(t *t for i := 0; i < 50; i++ { wg.Add(1) + evictionHandlerWG.Add(1) go func() { cache.AddTx(createTx([]byte("alice-x"), "alice", 42)) _ = cache.RemoveTxByHash([]byte("alice-x")) @@ -624,6 +644,55 @@ func TestTxCache_NoCriticalInconsistency_WhenConcurrentAdditionsAndRemovals(t *t } cache.Clear() + + evictionHandlerWG.Wait() + require.Equal(t, uint32(10000), atomic.LoadUint32(&handlerCalls)) +} + +func TestTxCache_RegisterEvictionHandler(t *testing.T) { + t.Parallel() + + cache := newUnconstrainedCacheToTest() + + cache.AddTx(createTx([]byte("hash-1"), "alice", 1)) + cache.AddTx(createTx([]byte("hash-2"), "alice", 2)) + + err := cache.RegisterEvictionHandler(nil) + require.Equal(t, common.ErrNilEvictionHandler, err) + + ch := make(chan uint32) + cnt := uint32(0) + err = cache.RegisterEvictionHandler(&testscommon.EvictionNotifierStub{ + NotifyEvictionCalled: func(hash []byte) { + atomic.AddUint32(&cnt, 1) + require.True(t, bytes.Equal([]byte("hash-1"), hash) || bytes.Equal([]byte("hash-2"), hash)) + ch <- atomic.LoadUint32(&cnt) + }, + }) + require.NoError(t, err) + + removed := cache.RemoveTxByHash([]byte("hash-1")) + require.True(t, removed) + cache.Remove([]byte("hash-2")) + for { + chCnt := uint32(0) + select { + case chCnt = <-ch: + case <-time.After(time.Second): + require.Fail(t, "timeout") + } + if chCnt == 2 { + break + } + } + + foundTx, ok := cache.GetByTxHash([]byte("hash-1")) + require.False(t, ok) + require.Nil(t, foundTx) + + foundTx, ok = cache.GetByTxHash([]byte("hash-2")) + require.False(t, ok) + require.Nil(t, foundTx) } func newUnconstrainedCacheToTest() *TxCache { diff --git a/types/interface.go b/types/interface.go index 99dca5f4..4a483f6a 100644 --- a/types/interface.go +++ b/types/interface.go @@ -225,8 +225,14 @@ type ShardIDProvider interface { IsInterfaceNil() bool } -// PersisterCreator defines the behavour of a component which is able to create a persister +// PersisterCreator defines the behaviour of a component which is able to create a persister type PersisterCreator interface { CreateBasePersister(path string) (Persister, error) IsInterfaceNil() bool } + +// EvictionNotifier defines the behaviour of a component which is able to handle an evicted transaction +type EvictionNotifier interface { + NotifyEviction(txHash []byte) + IsInterfaceNil() bool +}