From f2b403bcdc0200b7efd278e1c7127e9b72a6c399 Mon Sep 17 00:00:00 2001 From: maypok86 Date: Thu, 7 Mar 2024 19:18:43 +0300 Subject: [PATCH] [#63] Add deletion listener --- builder.go | 50 ++++++++++++---- builder_test.go | 4 ++ cache.go | 14 +++++ cache_test.go | 130 +++++++++++++++++++++++++++++++++++++++-- internal/core/cache.go | 108 +++++++++++++++++++++++----------- 5 files changed, 257 insertions(+), 49 deletions(-) diff --git a/builder.go b/builder.go index dcbb638..2498c90 100644 --- a/builder.go +++ b/builder.go @@ -37,11 +37,12 @@ var ( ) type baseOptions[K comparable, V any] struct { - capacity int - initialCapacity int - statsEnabled bool - withCost bool - costFunc func(key K, value V) uint32 + capacity int + initialCapacity int + statsEnabled bool + withCost bool + costFunc func(key K, value V) uint32 + deletionListener func(key K, value V, cause DeletionCause) } func (o *baseOptions[K, V]) collectStats() { @@ -57,6 +58,10 @@ func (o *baseOptions[K, V]) setInitialCapacity(initialCapacity int) { o.initialCapacity = initialCapacity } +func (o *baseOptions[K, V]) setDeletionListener(deletionListener func(key K, value V, cause DeletionCause)) { + o.deletionListener = deletionListener +} + func (o *baseOptions[K, V]) validate() error { if o.initialCapacity <= 0 && o.initialCapacity != unsetCapacity { return ErrIllegalInitialCapacity @@ -73,11 +78,12 @@ func (o *baseOptions[K, V]) toConfig() core.Config[K, V] { initialCapacity = &o.initialCapacity } return core.Config[K, V]{ - Capacity: o.capacity, - InitialCapacity: initialCapacity, - StatsEnabled: o.statsEnabled, - CostFunc: o.costFunc, - WithCost: o.withCost, + Capacity: o.capacity, + InitialCapacity: initialCapacity, + StatsEnabled: o.statsEnabled, + CostFunc: o.costFunc, + WithCost: o.withCost, + DeletionListener: o.deletionListener, } } @@ -169,6 +175,14 @@ func (b *Builder[K, V]) Cost(costFunc func(key K, value V) uint32) *Builder[K, V return b } +// DeletionListener specifies a listener instance that caches should notify each time an entry is deleted for any +// DeletionCause cause. The cache will invoke this listener in the background goroutine +// after the entry's deletion operation has completed. +func (b *Builder[K, V]) DeletionListener(deletionListener func(key K, value V, cause DeletionCause)) *Builder[K, V] { + b.setDeletionListener(deletionListener) + return b +} + // WithTTL specifies that each item should be automatically removed from the cache once a fixed duration // has elapsed after the item's creation. func (b *Builder[K, V]) WithTTL(ttl time.Duration) *ConstTTLBuilder[K, V] { @@ -231,6 +245,14 @@ func (b *ConstTTLBuilder[K, V]) Cost(costFunc func(key K, value V) uint32) *Cons return b } +// DeletionListener specifies a listener instance that caches should notify each time an entry is deleted for any +// DeletionCause cause. The cache will invoke this listener in the background goroutine +// after the entry's deletion operation has completed. +func (b *ConstTTLBuilder[K, V]) DeletionListener(deletionListener func(key K, value V, cause DeletionCause)) *ConstTTLBuilder[K, V] { + b.setDeletionListener(deletionListener) + return b +} + // Build creates a configured cache or // returns an error if invalid parameters were passed to the builder. func (b *ConstTTLBuilder[K, V]) Build() (Cache[K, V], error) { @@ -270,6 +292,14 @@ func (b *VariableTTLBuilder[K, V]) Cost(costFunc func(key K, value V) uint32) *V return b } +// DeletionListener specifies a listener instance that caches should notify each time an entry is deleted for any +// DeletionCause cause. The cache will invoke this listener in the background goroutine +// after the entry's deletion operation has completed. +func (b *VariableTTLBuilder[K, V]) DeletionListener(deletionListener func(key K, value V, cause DeletionCause)) *VariableTTLBuilder[K, V] { + b.setDeletionListener(deletionListener) + return b +} + // Build creates a configured cache or // returns an error if invalid parameters were passed to the builder. func (b *VariableTTLBuilder[K, V]) Build() (CacheWithVariableTTL[K, V], error) { diff --git a/builder_test.go b/builder_test.go index 32b34dd..1ab3602 100644 --- a/builder_test.go +++ b/builder_test.go @@ -92,6 +92,8 @@ func TestBuilder_BuildSuccess(t *testing.T) { cc, err := b.WithTTL(time.Minute).CollectStats().Cost(func(key int, value int) uint32 { return 2 + }).DeletionListener(func(key int, value int, cause DeletionCause) { + fmt.Println("const ttl") }).Build() if err != nil { t.Fatalf("builded cache with error: %v", err) @@ -103,6 +105,8 @@ func TestBuilder_BuildSuccess(t *testing.T) { cv, err := b.WithVariableTTL().CollectStats().Cost(func(key int, value int) uint32 { return 2 + }).DeletionListener(func(key int, value int, cause DeletionCause) { + fmt.Println("variable ttl") }).Build() if err != nil { t.Fatalf("builded cache with error: %v", err) diff --git a/cache.go b/cache.go index 05403cf..b2704d8 100644 --- a/cache.go +++ b/cache.go @@ -20,6 +20,20 @@ import ( "github.com/maypok86/otter/internal/core" ) +// DeletionCause the cause why a cached entry was deleted. +type DeletionCause = core.DeletionCause + +const ( + // Explicit the entry was manually deleted by the user. + Explicit = core.Explicit + // Replaced the entry itself was not actually deleted, but its value was replaced by the user. + Replaced = core.Replaced + // Size the entry was evicted due to size constraints. + Size = core.Size + // Expired the entry's expiration timestamp has passed. + Expired = core.Expired +) + type baseCache[K comparable, V any] struct { cache *core.Cache[K, V] } diff --git a/cache_test.go b/cache_test.go index 442e42a..39969dc 100644 --- a/cache_test.go +++ b/cache_test.go @@ -26,8 +26,18 @@ import ( ) func TestCache_Set(t *testing.T) { - const size = 100 - c, err := MustBuilder[int, int](size).WithTTL(time.Minute).CollectStats().Build() + const size = 256 + var mutex sync.Mutex + m := make(map[DeletionCause]int) + c, err := MustBuilder[int, int](size). + WithTTL(time.Minute). + CollectStats(). + DeletionListener(func(key int, value int, cause DeletionCause) { + mutex.Lock() + m[cause]++ + mutex.Unlock() + }). + Build() if err != nil { t.Fatalf("can not create cache: %v", err) } @@ -72,6 +82,12 @@ func TestCache_Set(t *testing.T) { if ratio != 1.0 { t.Fatalf("cache hit ratio should be 1.0, but got %v", ratio) } + + mutex.Lock() + defer mutex.Unlock() + if len(m) != 1 || m[Replaced] != size { + t.Fatalf("cache was supposed to replace %d, but replaced %d entries", size, m[Replaced]) + } } func TestCache_SetIfAbsent(t *testing.T) { @@ -133,9 +149,16 @@ func TestCache_SetIfAbsent(t *testing.T) { func TestCache_SetWithTTL(t *testing.T) { size := 256 + var mutex sync.Mutex + m := make(map[DeletionCause]int) c, err := MustBuilder[int, int](size). InitialCapacity(size). WithTTL(time.Second). + DeletionListener(func(key int, value int, cause DeletionCause) { + mutex.Lock() + m[cause]++ + mutex.Unlock() + }). Build() if err != nil { t.Fatalf("can not create builder: %v", err) @@ -158,7 +181,23 @@ func TestCache_SetWithTTL(t *testing.T) { t.Fatalf("c.Size() = %d, want = %d", cacheSize, 0) } - cc, err := MustBuilder[int, int](size).WithVariableTTL().CollectStats().Build() + mutex.Lock() + if e := m[Expired]; len(m) != 1 || e != size { + mutex.Unlock() + t.Fatalf("cache was supposed to expire %d, but expired %d entries", size, e) + } + mutex.Unlock() + + m = make(map[DeletionCause]int) + cc, err := MustBuilder[int, int](size). + WithVariableTTL(). + CollectStats(). + DeletionListener(func(key int, value int, cause DeletionCause) { + mutex.Lock() + m[cause]++ + mutex.Unlock() + }). + Build() if err != nil { t.Fatalf("can not create builder: %v", err) } @@ -183,13 +222,71 @@ func TestCache_SetWithTTL(t *testing.T) { if misses := cc.Stats().Misses(); misses != int64(size) { t.Fatalf("c.Stats().Misses() = %d, want = %d", misses, size) } + mutex.Lock() + defer mutex.Unlock() + if len(m) != 1 || m[Expired] != size { + t.Fatalf("cache was supposed to expire %d, but expired %d entries", size, m[Expired]) + } +} + +func TestCache_Delete(t *testing.T) { + size := 256 + var mutex sync.Mutex + m := make(map[DeletionCause]int) + c, err := MustBuilder[int, int](size). + InitialCapacity(size). + WithTTL(time.Hour). + DeletionListener(func(key int, value int, cause DeletionCause) { + mutex.Lock() + m[cause]++ + mutex.Unlock() + }). + Build() + if err != nil { + t.Fatalf("can not create builder: %v", err) + } + + for i := 0; i < size; i++ { + c.Set(i, i) + } + + for i := 0; i < size; i++ { + if !c.Has(i) { + t.Fatalf("key should exists: %d", i) + } + } + + for i := 0; i < size; i++ { + c.Delete(i) + } + + for i := 0; i < size; i++ { + if c.Has(i) { + t.Fatalf("key should not exists: %d", i) + } + } + + time.Sleep(time.Second) + + mutex.Lock() + defer mutex.Unlock() + if len(m) != 1 || m[Explicit] != size { + t.Fatalf("cache was supposed to delete %d, but deleted %d entries", size, m[Explicit]) + } } func TestCache_DeleteByFunc(t *testing.T) { size := 256 + var mutex sync.Mutex + m := make(map[DeletionCause]int) c, err := MustBuilder[int, int](size). InitialCapacity(size). WithTTL(time.Hour). + DeletionListener(func(key int, value int, cause DeletionCause) { + mutex.Lock() + m[cause]++ + mutex.Unlock() + }). Build() if err != nil { t.Fatalf("can not create builder: %v", err) @@ -209,10 +306,28 @@ func TestCache_DeleteByFunc(t *testing.T) { } return true }) + + time.Sleep(time.Second) + + expected := size / 2 + mutex.Lock() + defer mutex.Unlock() + if len(m) != 1 || m[Explicit] != expected { + t.Fatalf("cache was supposed to delete %d, but deleted %d entries", expected, m[Explicit]) + } } func TestCache_Ratio(t *testing.T) { - c, err := MustBuilder[uint64, uint64](100).CollectStats().Build() + var mutex sync.Mutex + m := make(map[DeletionCause]int) + c, err := MustBuilder[uint64, uint64](100). + CollectStats(). + DeletionListener(func(key uint64, value uint64, cause DeletionCause) { + mutex.Lock() + m[cause]++ + mutex.Unlock() + }). + Build() if err != nil { t.Fatalf("can not create cache: %v", err) } @@ -231,6 +346,13 @@ func TestCache_Ratio(t *testing.T) { t.Logf("actual size: %d, capacity: %d", c.Size(), c.Capacity()) t.Logf("actual: %.2f, optimal: %.2f", c.Stats().Ratio(), o.Ratio()) + + mutex.Lock() + defer mutex.Unlock() + t.Logf("evicted: %d", m[Size]) + if len(m) != 1 || m[Size] <= 0 || m[Size] > 5000 { + t.Fatalf("cache was supposed to evict positive number of entries, but evicted %d entries", m[Size]) + } } type optimal struct { diff --git a/internal/core/cache.go b/internal/core/cache.go index e460fe8..74c89f1 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -30,6 +30,20 @@ import ( "github.com/maypok86/otter/internal/xruntime" ) +// DeletionCause the cause why a cached entry was deleted. +type DeletionCause uint8 + +const ( + // Explicit the entry was manually deleted by the user. + Explicit DeletionCause = iota + // Replaced the entry itself was not actually deleted, but its value was replaced by the user. + Replaced + // Size the entry was evicted due to size constraints. + Size + // Expired the entry's expiration timestamp has passed. + Expired +) + func zeroValue[V any]() V { var zero V return zero @@ -42,13 +56,14 @@ func getExpiration(ttl time.Duration) uint32 { // Config is a set of cache settings. type Config[K comparable, V any] struct { - Capacity int - InitialCapacity *int - StatsEnabled bool - TTL *time.Duration - WithVariableTTL bool - CostFunc func(key K, value V) uint32 - WithCost bool + Capacity int + InitialCapacity *int + StatsEnabled bool + TTL *time.Duration + WithVariableTTL bool + CostFunc func(key K, value V) uint32 + WithCost bool + DeletionListener func(key K, value V, cause DeletionCause) } type expirePolicy[K comparable, V any] interface { @@ -61,22 +76,23 @@ type expirePolicy[K comparable, V any] interface { // Cache is a structure performs a best-effort bounding of a hash table using eviction algorithm // to determine which entries to evict when the capacity is exceeded. type Cache[K comparable, V any] struct { - nodeManager *node.Manager[K, V] - hashmap *hashtable.Map[K, V] - policy *s3fifo.Policy[K, V] - expirePolicy expirePolicy[K, V] - stats *stats.Stats - readBuffers []*lossy.Buffer[K, V] - writeBuffer *queue.MPSC[task[K, V]] - evictionMutex sync.Mutex - closeOnce sync.Once - doneClear chan struct{} - costFunc func(key K, value V) uint32 - capacity int - mask uint32 - ttl uint32 - withExpiration bool - isClosed bool + nodeManager *node.Manager[K, V] + hashmap *hashtable.Map[K, V] + policy *s3fifo.Policy[K, V] + expirePolicy expirePolicy[K, V] + stats *stats.Stats + readBuffers []*lossy.Buffer[K, V] + writeBuffer *queue.MPSC[task[K, V]] + evictionMutex sync.Mutex + closeOnce sync.Once + doneClear chan struct{} + costFunc func(key K, value V) uint32 + deletionListener func(key K, value V, cause DeletionCause) + capacity int + mask uint32 + ttl uint32 + withExpiration bool + isClosed bool } // NewCache returns a new cache instance based on the settings from Config. @@ -114,16 +130,17 @@ func NewCache[K comparable, V any](c Config[K, V]) *Cache[K, V] { } cache := &Cache[K, V]{ - nodeManager: nodeManager, - hashmap: hashmap, - policy: s3fifo.NewPolicy[K, V](uint32(c.Capacity)), - expirePolicy: expPolicy, - readBuffers: readBuffers, - writeBuffer: queue.NewMPSC[task[K, V]](writeBufferCapacity), - doneClear: make(chan struct{}), - mask: uint32(readBuffersCount - 1), - costFunc: c.CostFunc, - capacity: c.Capacity, + nodeManager: nodeManager, + hashmap: hashmap, + policy: s3fifo.NewPolicy[K, V](uint32(c.Capacity)), + expirePolicy: expPolicy, + readBuffers: readBuffers, + writeBuffer: queue.NewMPSC[task[K, V]](writeBufferCapacity), + doneClear: make(chan struct{}), + mask: uint32(readBuffersCount - 1), + costFunc: c.CostFunc, + deletionListener: c.DeletionListener, + capacity: c.Capacity, } if c.StatsEnabled { @@ -260,7 +277,7 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool) return true } -// Delete removes the association for this key from the cache. +// Delete deletes the association for this key from the cache. func (c *Cache[K, V]) Delete(key K) { c.afterDelete(c.hashmap.Delete(key)) } @@ -276,7 +293,7 @@ func (c *Cache[K, V]) afterDelete(deleted node.Node[K, V]) { } } -// DeleteByFunc removes the association for this key from the cache when the given function returns true. +// DeleteByFunc deletes the association for this key from the cache when the given function returns true. func (c *Cache[K, V]) DeleteByFunc(f func(key K, value V) bool) { c.hashmap.Range(func(n node.Node[K, V]) bool { if !n.IsAlive() || n.IsExpired() { @@ -291,6 +308,14 @@ func (c *Cache[K, V]) DeleteByFunc(f func(key K, value V) bool) { }) } +func (c *Cache[K, V]) notifyDeletion(key K, value V, cause DeletionCause) { + if c.deletionListener == nil { + return + } + + c.deletionListener(key, value, cause) +} + func (c *Cache[K, V]) cleanup() { bufferCapacity := 64 expired := make([]node.Node[K, V], 0, bufferCapacity) @@ -312,6 +337,7 @@ func (c *Cache[K, V]) cleanup() { for _, n := range expired { c.hashmap.DeleteNode(n) n.Die() + c.notifyDeletion(n.Key(), n.Value(), Expired) } expired = clearBuffer(expired) @@ -383,9 +409,21 @@ func (c *Cache[K, V]) process() { c.evictionMutex.Unlock() + for _, t := range buffer { + switch { + case t.isDelete(): + n := t.node() + c.notifyDeletion(n.Key(), n.Value(), Explicit) + case t.isUpdate(): + n := t.oldNode() + c.notifyDeletion(n.Key(), n.Value(), Replaced) + } + } + for _, n := range deleted { c.hashmap.DeleteNode(n) n.Die() + c.notifyDeletion(n.Key(), n.Value(), Size) c.stats.IncEvictedCount() c.stats.AddEvictedCost(n.Cost()) }