From 26de49efa377760855cfe31e3b680811f9dbf709 Mon Sep 17 00:00:00 2001 From: maypok86 Date: Sun, 28 Jul 2024 17:38:05 +0300 Subject: [PATCH] [#98] Fixed inconsistent listener behavior --- cache_test.go | 29 ++-- internal/core/cache.go | 237 ++++++++++++++----------------- internal/expiry/disabled.go | 3 +- internal/expiry/fixed.go | 15 +- internal/expiry/queue.go | 4 +- internal/expiry/queue_test.go | 16 +-- internal/expiry/variable.go | 24 ++-- internal/expiry/variable_test.go | 21 +-- internal/queue/growable.go | 12 ++ internal/s3fifo/ghost.go | 32 ++--- internal/s3fifo/main.go | 27 ++-- internal/s3fifo/policy.go | 25 ++-- internal/s3fifo/policy_test.go | 32 +++-- internal/s3fifo/queue.go | 4 +- internal/s3fifo/queue_test.go | 16 +-- internal/s3fifo/small.go | 38 ++--- 16 files changed, 278 insertions(+), 257 deletions(-) diff --git a/cache_test.go b/cache_test.go index d573f45..655d46d 100644 --- a/cache_test.go +++ b/cache_test.go @@ -25,8 +25,21 @@ import ( "github.com/maypok86/otter/internal/xruntime" ) +func getRandomSize(t *testing.T) int { + t.Helper() + + const ( + minSize = 10 + maxSize = 1000 + ) + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + return r.Intn(maxSize-minSize) + minSize +} + func TestCache_Set(t *testing.T) { - const size = 256 + size := getRandomSize(t) var mutex sync.Mutex m := make(map[DeletionCause]int) c, err := MustBuilder[int, int](size). @@ -60,7 +73,7 @@ func TestCache_Set(t *testing.T) { r := rand.New(rand.NewSource(time.Now().UnixNano())) for a := 0; a < 10000; a++ { - k := r.Int() % 100 + k := r.Int() % 2 val, ok := c.Get(k) if !ok { err = fmt.Errorf("expected %d but got nil", k) @@ -91,7 +104,7 @@ func TestCache_Set(t *testing.T) { } func TestCache_SetIfAbsent(t *testing.T) { - const size = 100 + size := getRandomSize(t) c, err := MustBuilder[int, int](size).WithTTL(time.Minute).CollectStats().Build() if err != nil { t.Fatalf("can not create cache: %v", err) @@ -140,7 +153,7 @@ func TestCache_SetIfAbsent(t *testing.T) { } } - if hits := cc.Stats().Hits(); hits != size { + if hits := cc.Stats().Hits(); hits != int64(size) { t.Fatalf("hit ratio should be 100%%. Hits: %d", hits) } @@ -148,7 +161,7 @@ func TestCache_SetIfAbsent(t *testing.T) { } func TestCache_SetWithTTL(t *testing.T) { - size := 256 + size := getRandomSize(t) var mutex sync.Mutex m := make(map[DeletionCause]int) c, err := MustBuilder[int, int](size). @@ -230,7 +243,7 @@ func TestCache_SetWithTTL(t *testing.T) { } func TestCache_Delete(t *testing.T) { - size := 256 + size := getRandomSize(t) var mutex sync.Mutex m := make(map[DeletionCause]int) c, err := MustBuilder[int, int](size). @@ -276,7 +289,7 @@ func TestCache_Delete(t *testing.T) { } func TestCache_DeleteByFunc(t *testing.T) { - size := 256 + size := getRandomSize(t) var mutex sync.Mutex m := make(map[DeletionCause]int) c, err := MustBuilder[int, int](size). @@ -318,7 +331,7 @@ func TestCache_DeleteByFunc(t *testing.T) { } func TestCache_Advanced(t *testing.T) { - size := 256 + size := getRandomSize(t) defaultTTL := time.Hour c, err := MustBuilder[int, int](size). WithTTL(defaultTTL). diff --git a/internal/core/cache.go b/internal/core/cache.go index 75f7874..8078627 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -45,9 +45,21 @@ const ( ) const ( - minWriteBufferCapacity uint32 = 4 + minWriteBufferSize uint32 = 4 ) +var ( + maxWriteBufferSize uint32 + maxStripedBufferSize int +) + +func init() { + parallelism := xruntime.Parallelism() + roundedParallelism := int(xmath.RoundUpPowerOf2(parallelism)) + maxWriteBufferSize = uint32(128 * roundedParallelism) + maxStripedBufferSize = 4 * roundedParallelism +} + func zeroValue[V any]() V { var zero V return zero @@ -76,7 +88,7 @@ type Config[K comparable, V any] struct { type expiryPolicy[K comparable, V any] interface { Add(n node.Node[K, V]) Delete(n node.Node[K, V]) - RemoveExpired(expired []node.Node[K, V]) []node.Node[K, V] + DeleteExpired() Clear() } @@ -88,7 +100,7 @@ type Cache[K comparable, V any] struct { policy *s3fifo.Policy[K, V] expiryPolicy expiryPolicy[K, V] stats *stats.Stats - readBuffers []*lossy.Buffer[K, V] + stripedBuffer []*lossy.Buffer[K, V] writeBuffer *queue.Growable[task[K, V]] evictionMutex sync.Mutex closeOnce sync.Once @@ -104,19 +116,14 @@ type Cache[K comparable, V any] struct { // NewCache returns a new cache instance based on the settings from Config. func NewCache[K comparable, V any](c Config[K, V]) *Cache[K, V] { - parallelism := xruntime.Parallelism() - roundedParallelism := int(xmath.RoundUpPowerOf2(parallelism)) - maxWriteBufferCapacity := uint32(128 * roundedParallelism) - readBuffersCount := 4 * roundedParallelism - nodeManager := node.NewManager[K, V](node.Config{ WithExpiration: c.TTL != nil || c.WithVariableTTL, WithCost: c.WithCost, }) - readBuffers := make([]*lossy.Buffer[K, V], 0, readBuffersCount) - for i := 0; i < readBuffersCount; i++ { - readBuffers = append(readBuffers, lossy.New[K, V](nodeManager)) + stripedBuffer := make([]*lossy.Buffer[K, V], 0, maxStripedBufferSize) + for i := 0; i < maxStripedBufferSize; i++ { + stripedBuffer = append(stripedBuffer, lossy.New[K, V](nodeManager)) } var hashmap *hashtable.Map[K, V] @@ -126,30 +133,29 @@ func NewCache[K comparable, V any](c Config[K, V]) *Cache[K, V] { hashmap = hashtable.NewWithSize[K, V](nodeManager, *c.InitialCapacity) } - var expPolicy expiryPolicy[K, V] - switch { - case c.TTL != nil: - expPolicy = expiry.NewFixed[K, V]() - case c.WithVariableTTL: - expPolicy = expiry.NewVariable[K, V](nodeManager) - default: - expPolicy = expiry.NewDisabled[K, V]() - } - cache := &Cache[K, V]{ nodeManager: nodeManager, hashmap: hashmap, - policy: s3fifo.NewPolicy[K, V](c.Capacity), - expiryPolicy: expPolicy, - readBuffers: readBuffers, - writeBuffer: queue.NewGrowable[task[K, V]](minWriteBufferCapacity, maxWriteBufferCapacity), + stripedBuffer: stripedBuffer, + writeBuffer: queue.NewGrowable[task[K, V]](minWriteBufferSize, maxWriteBufferSize), doneClear: make(chan struct{}), - mask: uint32(readBuffersCount - 1), + mask: uint32(maxStripedBufferSize - 1), costFunc: c.CostFunc, deletionListener: c.DeletionListener, capacity: c.Capacity, } + cache.policy = s3fifo.NewPolicy(c.Capacity, cache.evictNode) + + switch { + case c.TTL != nil: + cache.expiryPolicy = expiry.NewFixed[K, V](cache.deleteExpiredNode) + case c.WithVariableTTL: + cache.expiryPolicy = expiry.NewVariable[K, V](nodeManager, cache.deleteExpiredNode) + default: + cache.expiryPolicy = expiry.NewDisabled[K, V]() + } + if c.StatsEnabled { cache.stats = stats.New() } @@ -224,13 +230,13 @@ func (c *Cache[K, V]) GetNodeQuietly(key K) (node.Node[K, V], bool) { func (c *Cache[K, V]) afterGet(got node.Node[K, V]) { idx := c.getReadBufferIdx() - pb := c.readBuffers[idx].Add(got) + pb := c.stripedBuffer[idx].Add(got) if pb != nil { c.evictionMutex.Lock() c.policy.Read(pb.Returned) c.evictionMutex.Unlock() - c.readBuffers[idx].Free() + c.stripedBuffer[idx].Free() } } @@ -346,124 +352,105 @@ func (c *Cache[K, V]) notifyDeletion(key K, value V, cause DeletionCause) { c.deletionListener(key, value, cause) } +func (c *Cache[K, V]) deleteExpiredNode(n node.Node[K, V]) { + c.policy.Delete(n) + deleted := c.hashmap.DeleteNode(n) + if deleted != nil { + n.Die() + c.notifyDeletion(n.Key(), n.Value(), Expired) + } +} + func (c *Cache[K, V]) cleanup() { - bufferCapacity := 64 - expired := make([]node.Node[K, V], 0, bufferCapacity) for { time.Sleep(time.Second) c.evictionMutex.Lock() if c.isClosed { + c.evictionMutex.Unlock() return } - expired = c.expiryPolicy.RemoveExpired(expired) - for _, n := range expired { - c.policy.Delete(n) - } + c.expiryPolicy.DeleteExpired() c.evictionMutex.Unlock() + } +} + +func (c *Cache[K, V]) evictNode(n node.Node[K, V]) { + c.expiryPolicy.Delete(n) + deleted := c.hashmap.DeleteNode(n) + if deleted != nil { + n.Die() + c.notifyDeletion(n.Key(), n.Value(), Size) + c.stats.IncEvictedCount() + c.stats.AddEvictedCost(n.Cost()) + } +} - for _, n := range expired { - c.hashmap.DeleteNode(n) - n.Die() - c.notifyDeletion(n.Key(), n.Value(), Expired) +func (c *Cache[K, V]) onWrite(t task[K, V]) { + if t.isClear() || t.isClose() { + c.writeBuffer.Clear() + + c.policy.Clear() + c.expiryPolicy.Clear() + if t.isClose() { + c.isClosed = true } - expired = clearBuffer(expired) - if cap(expired) > 3*bufferCapacity { - expired = make([]node.Node[K, V], 0, bufferCapacity) + c.doneClear <- struct{}{} + return + } + + n := t.node() + switch { + case t.isAdd(): + if n.IsAlive() { + c.expiryPolicy.Add(n) + c.policy.Add(n) } + case t.isUpdate(): + oldNode := t.oldNode() + c.expiryPolicy.Delete(oldNode) + c.policy.Delete(oldNode) + if n.IsAlive() { + c.expiryPolicy.Add(n) + c.policy.Add(n) + } + c.notifyDeletion(oldNode.Key(), oldNode.Value(), Replaced) + case t.isDelete(): + c.expiryPolicy.Delete(n) + c.policy.Delete(n) + c.notifyDeletion(n.Key(), n.Value(), Explicit) } } func (c *Cache[K, V]) process() { - bufferCapacity := 64 - buffer := make([]task[K, V], 0, bufferCapacity) - deleted := make([]node.Node[K, V], 0, bufferCapacity) - i := 0 for { t := c.writeBuffer.Pop() - - if t.isClear() || t.isClose() { - buffer = clearBuffer(buffer) - c.writeBuffer.Clear() - - c.evictionMutex.Lock() - c.policy.Clear() - c.expiryPolicy.Clear() - if t.isClose() { - c.isClosed = true - } + c.evictionMutex.Lock() + c.onWrite(t) + if t.isClose() { + c.evictionMutex.Unlock() + break + } else if t.isClear() { c.evictionMutex.Unlock() - - c.doneClear <- struct{}{} - if t.isClose() { - break - } continue } - - buffer = append(buffer, t) - i++ - if i >= bufferCapacity { - i -= bufferCapacity - - c.evictionMutex.Lock() - - for _, t := range buffer { - n := t.node() - switch { - case t.isDelete(): - c.expiryPolicy.Delete(n) - c.policy.Delete(n) - case t.isAdd(): - if n.IsAlive() { - c.expiryPolicy.Add(n) - deleted = c.policy.Add(deleted, n) - } - case t.isUpdate(): - oldNode := t.oldNode() - c.expiryPolicy.Delete(oldNode) - c.policy.Delete(oldNode) - if n.IsAlive() { - c.expiryPolicy.Add(n) - deleted = c.policy.Add(deleted, n) - } - } - } - - for _, n := range deleted { - c.expiryPolicy.Delete(n) - } - - c.evictionMutex.Unlock() - - for _, t := range buffer { - switch { - case t.isDelete(): - n := t.node() - c.notifyDeletion(n.Key(), n.Value(), Explicit) - case t.isUpdate(): - n := t.oldNode() - c.notifyDeletion(n.Key(), n.Value(), Replaced) - } - } - - for _, n := range deleted { - c.hashmap.DeleteNode(n) - n.Die() - c.notifyDeletion(n.Key(), n.Value(), Size) - c.stats.IncEvictedCount() - c.stats.AddEvictedCost(n.Cost()) + for i := uint32(0); i < maxWriteBufferSize; i++ { + t, ok := c.writeBuffer.TryPop() + if !ok { + break } - - buffer = clearBuffer(buffer) - deleted = clearBuffer(deleted) - if cap(deleted) > 3*bufferCapacity { - deleted = make([]node.Node[K, V], 0, bufferCapacity) + c.onWrite(t) + if t.isClose() { + c.evictionMutex.Unlock() + return + } else if t.isClear() { + break } } + c.evictionMutex.Unlock() } } @@ -489,8 +476,8 @@ func (c *Cache[K, V]) Clear() { func (c *Cache[K, V]) clear(t task[K, V]) { c.hashmap.Clear() - for i := 0; i < len(c.readBuffers); i++ { - c.readBuffers[i].Clear() + for i := 0; i < len(c.stripedBuffer); i++ { + c.stripedBuffer[i].Clear() } c.writeBuffer.Push(t) @@ -530,11 +517,3 @@ func (c *Cache[K, V]) Stats() *stats.Stats { func (c *Cache[K, V]) WithExpiration() bool { return c.withExpiration } - -func clearBuffer[T any](buffer []T) []T { - var zero T - for i := 0; i < len(buffer); i++ { - buffer[i] = zero - } - return buffer[:0] -} diff --git a/internal/expiry/disabled.go b/internal/expiry/disabled.go index 61271f8..e75494a 100644 --- a/internal/expiry/disabled.go +++ b/internal/expiry/disabled.go @@ -28,8 +28,7 @@ func (d *Disabled[K, V]) Add(n node.Node[K, V]) { func (d *Disabled[K, V]) Delete(n node.Node[K, V]) { } -func (d *Disabled[K, V]) RemoveExpired(expired []node.Node[K, V]) []node.Node[K, V] { - return expired +func (d *Disabled[K, V]) DeleteExpired() { } func (d *Disabled[K, V]) Clear() { diff --git a/internal/expiry/fixed.go b/internal/expiry/fixed.go index 6ca7f9b..35792aa 100644 --- a/internal/expiry/fixed.go +++ b/internal/expiry/fixed.go @@ -17,12 +17,14 @@ package expiry import "github.com/maypok86/otter/internal/generated/node" type Fixed[K comparable, V any] struct { - q *queue[K, V] + q *queue[K, V] + deleteNode func(node.Node[K, V]) } -func NewFixed[K comparable, V any]() *Fixed[K, V] { +func NewFixed[K comparable, V any](deleteNode func(node.Node[K, V])) *Fixed[K, V] { return &Fixed[K, V]{ - q: newQueue[K, V](), + q: newQueue[K, V](), + deleteNode: deleteNode, } } @@ -31,14 +33,13 @@ func (f *Fixed[K, V]) Add(n node.Node[K, V]) { } func (f *Fixed[K, V]) Delete(n node.Node[K, V]) { - f.q.remove(n) + f.q.delete(n) } -func (f *Fixed[K, V]) RemoveExpired(expired []node.Node[K, V]) []node.Node[K, V] { +func (f *Fixed[K, V]) DeleteExpired() { for !f.q.isEmpty() && f.q.head.HasExpired() { - expired = append(expired, f.q.pop()) + f.deleteNode(f.q.pop()) } - return expired } func (f *Fixed[K, V]) Clear() { diff --git a/internal/expiry/queue.go b/internal/expiry/queue.go index 18c5a85..ce4bdf1 100644 --- a/internal/expiry/queue.go +++ b/internal/expiry/queue.go @@ -53,11 +53,11 @@ func (q *queue[K, V]) pop() node.Node[K, V] { } result := q.head - q.remove(result) + q.delete(result) return result } -func (q *queue[K, V]) remove(n node.Node[K, V]) { +func (q *queue[K, V]) delete(n node.Node[K, V]) { next := n.NextExp() prev := n.PrevExp() diff --git a/internal/expiry/queue_test.go b/internal/expiry/queue_test.go index 9a31856..c6e3963 100644 --- a/internal/expiry/queue_test.go +++ b/internal/expiry/queue_test.go @@ -76,10 +76,10 @@ func TestQueue(t *testing.T) { e := newNode("a") q.push(e) checkQueuePointers(t, q, []node.Node[string, string]{e}) - q.remove(e) + q.delete(e) q.push(e) checkQueuePointers(t, q, []node.Node[string, string]{e}) - q.remove(e) + q.delete(e) checkQueuePointers(t, q, []node.Node[string, string]{}) // Bigger queue @@ -93,11 +93,11 @@ func TestQueue(t *testing.T) { q.push(e4) checkQueuePointers(t, q, []node.Node[string, string]{e1, e2, e3, e4}) - q.remove(e2) + q.delete(e2) checkQueuePointers(t, q, []node.Node[string, string]{e1, e3, e4}) // move from middle - q.remove(e3) + q.delete(e3) q.push(e3) checkQueuePointers(t, q, []node.Node[string, string]{e1, e4, e3}) @@ -108,7 +108,7 @@ func TestQueue(t *testing.T) { checkQueuePointers(t, q, []node.Node[string, string]{e3, e1, e4}) // should be no-op - q.remove(e3) + q.delete(e3) q.push(e3) checkQueuePointers(t, q, []node.Node[string, string]{e1, e4, e3}) @@ -129,7 +129,7 @@ func TestQueue(t *testing.T) { var next node.Node[string, string] for e := q.head; !node.Equals(e, nil); e = next { next = e.NextExp() - q.remove(e) + q.delete(e) } checkQueuePointers(t, q, []node.Node[string, string]{}) } @@ -143,8 +143,8 @@ func TestQueue_Remove(t *testing.T) { q.push(e2) checkQueuePointers(t, q, []node.Node[int, int]{e1, e2}) e := q.head - q.remove(e) + q.delete(e) checkQueuePointers(t, q, []node.Node[int, int]{e2}) - q.remove(e) + q.delete(e) checkQueuePointers(t, q, []node.Node[int, int]{e2}) } diff --git a/internal/expiry/variable.go b/internal/expiry/variable.go index b5791a1..cecf3f1 100644 --- a/internal/expiry/variable.go +++ b/internal/expiry/variable.go @@ -44,11 +44,12 @@ var ( ) type Variable[K comparable, V any] struct { - wheel [][]node.Node[K, V] - time uint32 + wheel [][]node.Node[K, V] + time uint32 + deleteNode func(node.Node[K, V]) } -func NewVariable[K comparable, V any](nodeManager *node.Manager[K, V]) *Variable[K, V] { +func NewVariable[K comparable, V any](nodeManager *node.Manager[K, V], deleteNode func(node.Node[K, V])) *Variable[K, V] { wheel := make([][]node.Node[K, V], len(buckets)) for i := 0; i < len(wheel); i++ { wheel[i] = make([]node.Node[K, V], buckets[i]) @@ -62,7 +63,8 @@ func NewVariable[K comparable, V any](nodeManager *node.Manager[K, V]) *Variable } } return &Variable[K, V]{ - wheel: wheel, + wheel: wheel, + deleteNode: deleteNode, } } @@ -93,7 +95,7 @@ func (v *Variable[K, V]) Delete(n node.Node[K, V]) { n.SetPrevExp(nil) } -func (v *Variable[K, V]) RemoveExpired(expired []node.Node[K, V]) []node.Node[K, V] { +func (v *Variable[K, V]) DeleteExpired() { currentTime := unixtime.Now() prevTime := v.time v.time = currentTime @@ -106,13 +108,11 @@ func (v *Variable[K, V]) RemoveExpired(expired []node.Node[K, V]) []node.Node[K, break } - expired = v.removeExpiredFromBucket(expired, i, previousTicks, delta) + v.deleteExpiredFromBucket(i, previousTicks, delta) } - - return expired } -func (v *Variable[K, V]) removeExpiredFromBucket(expired []node.Node[K, V], index int, prevTicks, delta uint32) []node.Node[K, V] { +func (v *Variable[K, V]) deleteExpiredFromBucket(index int, prevTicks, delta uint32) { mask := buckets[index] - 1 steps := buckets[index] if delta < steps { @@ -133,7 +133,7 @@ func (v *Variable[K, V]) removeExpiredFromBucket(expired []node.Node[K, V], inde n.SetNextExp(nil) if n.Expiration() <= v.time { - expired = append(expired, n) + v.deleteNode(n) } else { v.Add(n) } @@ -141,8 +141,6 @@ func (v *Variable[K, V]) removeExpiredFromBucket(expired []node.Node[K, V], inde n = next } } - - return expired } func (v *Variable[K, V]) Clear() { @@ -150,7 +148,7 @@ func (v *Variable[K, V]) Clear() { for j := 0; j < len(v.wheel[i]); j++ { root := v.wheel[i][j] n := root.NextExp() - // NOTE(maypok86): Maybe we should use the same approach as in RemoveExpired? + // NOTE(maypok86): Maybe we should use the same approach as in DeleteExpired? for !node.Equals(n, root) { next := n.NextExp() diff --git a/internal/expiry/variable_test.go b/internal/expiry/variable_test.go index f041e7e..8e74e88 100644 --- a/internal/expiry/variable_test.go +++ b/internal/expiry/variable_test.go @@ -56,7 +56,8 @@ func TestVariable_Add(t *testing.T) { nm.Create("k2", "", 69, 1), nm.Create("k3", "", 4399, 1), } - v := NewVariable[string, string](nm) + v := NewVariable[string, string](nm, func(n node.Node[string, string]) { + }) for _, n := range nodes { v.Add(n) @@ -93,7 +94,7 @@ func TestVariable_Add(t *testing.T) { } } -func TestVariable_RemoveExpired(t *testing.T) { +func TestVariable_DeleteExpired(t *testing.T) { nm := node.NewManager[string, string](node.Config{ WithExpiration: true, }) @@ -106,36 +107,38 @@ func TestVariable_RemoveExpired(t *testing.T) { nm.Create("k6", "", 142000, 1), nm.Create("k7", "", 1420000, 1), } - v := NewVariable[string, string](nm) + var expired []node.Node[string, string] + v := NewVariable[string, string](nm, func(n node.Node[string, string]) { + expired = append(expired, n) + }) for _, n := range nodes { v.Add(n) } - var expired []node.Node[string, string] var keys []string unixtime.SetNow(64) - expired = v.RemoveExpired(expired) + v.DeleteExpired() keys = append(keys, "k1", "k2", "k3") match(t, expired, keys) unixtime.SetNow(200) - expired = v.RemoveExpired(expired) + v.DeleteExpired() keys = append(keys, "k4") match(t, expired, keys) unixtime.SetNow(12000) - expired = v.RemoveExpired(expired) + v.DeleteExpired() keys = append(keys, "k5") match(t, expired, keys) unixtime.SetNow(350000) - expired = v.RemoveExpired(expired) + v.DeleteExpired() keys = append(keys, "k6") match(t, expired, keys) unixtime.SetNow(1520000) - expired = v.RemoveExpired(expired) + v.DeleteExpired() keys = append(keys, "k7") match(t, expired, keys) } diff --git a/internal/queue/growable.go b/internal/queue/growable.go index a3a5b03..80e8065 100644 --- a/internal/queue/growable.go +++ b/internal/queue/growable.go @@ -75,6 +75,18 @@ func (g *Growable[T]) Pop() T { return item } +func (g *Growable[T]) TryPop() (T, bool) { + var zero T + g.mutex.Lock() + if g.count == 0 { + g.mutex.Unlock() + return zero, false + } + item := g.pop() + g.mutex.Unlock() + return item, true +} + func (g *Growable[T]) pop() T { var zero T diff --git a/internal/s3fifo/ghost.go b/internal/s3fifo/ghost.go index 9ced930..55e2f04 100644 --- a/internal/s3fifo/ghost.go +++ b/internal/s3fifo/ghost.go @@ -22,19 +22,21 @@ import ( ) type ghost[K comparable, V any] struct { - q *deque.Deque[uint64] - m map[uint64]struct{} - main *main[K, V] - small *small[K, V] - hasher maphash.Hasher[K] + q *deque.Deque[uint64] + m map[uint64]struct{} + main *main[K, V] + small *small[K, V] + hasher maphash.Hasher[K] + evictNode func(node.Node[K, V]) } -func newGhost[K comparable, V any](main *main[K, V]) *ghost[K, V] { +func newGhost[K comparable, V any](main *main[K, V], evictNode func(node.Node[K, V])) *ghost[K, V] { return &ghost[K, V]{ - q: deque.New[uint64](), - m: make(map[uint64]struct{}), - main: main, - hasher: maphash.NewHasher[K](), + q: deque.New[uint64](), + m: make(map[uint64]struct{}), + main: main, + hasher: maphash.NewHasher[K](), + evictNode: evictNode, } } @@ -44,18 +46,18 @@ func (g *ghost[K, V]) isGhost(n node.Node[K, V]) bool { return ok } -func (g *ghost[K, V]) insert(deleted []node.Node[K, V], n node.Node[K, V]) []node.Node[K, V] { - deleted = append(deleted, n) +func (g *ghost[K, V]) insert(n node.Node[K, V]) { + g.evictNode(n) h := g.hasher.Hash(n.Key()) if _, ok := g.m[h]; ok { - return deleted + return } maxLength := g.small.length() + g.main.length() if maxLength == 0 { - return deleted + return } for g.q.Len() >= maxLength { @@ -65,8 +67,6 @@ func (g *ghost[K, V]) insert(deleted []node.Node[K, V], n node.Node[K, V]) []nod g.q.PushBack(h) g.m[h] = struct{}{} - - return deleted } func (g *ghost[K, V]) clear() { diff --git a/internal/s3fifo/main.go b/internal/s3fifo/main.go index a2eeabb..e57120c 100644 --- a/internal/s3fifo/main.go +++ b/internal/s3fifo/main.go @@ -21,15 +21,17 @@ import ( const maxReinsertions = 20 type main[K comparable, V any] struct { - q *queue[K, V] - cost int - maxCost int + q *queue[K, V] + cost int + maxCost int + evictNode func(node.Node[K, V]) } -func newMain[K comparable, V any](maxCost int) *main[K, V] { +func newMain[K comparable, V any](maxCost int, evictNode func(node.Node[K, V])) *main[K, V] { return &main[K, V]{ - q: newQueue[K, V](), - maxCost: maxCost, + q: newQueue[K, V](), + maxCost: maxCost, + evictNode: evictNode, } } @@ -39,7 +41,7 @@ func (m *main[K, V]) insert(n node.Node[K, V]) { m.cost += int(n.Cost()) } -func (m *main[K, V]) evict(deleted []node.Node[K, V]) []node.Node[K, V] { +func (m *main[K, V]) evict() { reinsertions := 0 for m.cost > 0 { n := m.q.pop() @@ -47,7 +49,8 @@ func (m *main[K, V]) evict(deleted []node.Node[K, V]) []node.Node[K, V] { if !n.IsAlive() || n.HasExpired() || n.Frequency() == 0 { n.Unmark() m.cost -= int(n.Cost()) - return append(deleted, n) + m.evictNode(n) + return } // to avoid the worst case O(n), we remove the 20th reinserted consecutive element. @@ -55,19 +58,19 @@ func (m *main[K, V]) evict(deleted []node.Node[K, V]) []node.Node[K, V] { if reinsertions >= maxReinsertions { n.Unmark() m.cost -= int(n.Cost()) - return append(deleted, n) + m.evictNode(n) + return } m.q.push(n) n.DecrementFrequency() } - return deleted } -func (m *main[K, V]) remove(n node.Node[K, V]) { +func (m *main[K, V]) delete(n node.Node[K, V]) { m.cost -= int(n.Cost()) n.Unmark() - m.q.remove(n) + m.q.delete(n) } func (m *main[K, V]) length() int { diff --git a/internal/s3fifo/policy.go b/internal/s3fifo/policy.go index c077955..dd69817 100644 --- a/internal/s3fifo/policy.go +++ b/internal/s3fifo/policy.go @@ -29,13 +29,13 @@ type Policy[K comparable, V any] struct { } // NewPolicy creates a new Policy. -func NewPolicy[K comparable, V any](maxCost int) *Policy[K, V] { +func NewPolicy[K comparable, V any](maxCost int, evictNode func(node.Node[K, V])) *Policy[K, V] { smallMaxCost := maxCost / 10 mainMaxCost := maxCost - smallMaxCost - main := newMain[K, V](mainMaxCost) - ghost := newGhost(main) - small := newSmall(smallMaxCost, main, ghost) + main := newMain[K, V](mainMaxCost, evictNode) + ghost := newGhost(main, evictNode) + small := newSmall(smallMaxCost, main, ghost, evictNode) ghost.small = small return &Policy[K, V]{ @@ -55,7 +55,7 @@ func (p *Policy[K, V]) Read(nodes []node.Node[K, V]) { } // Add adds node to the eviction policy. -func (p *Policy[K, V]) Add(deleted []node.Node[K, V], n node.Node[K, V]) []node.Node[K, V] { +func (p *Policy[K, V]) Add(n node.Node[K, V]) { if p.ghost.isGhost(n) { p.main.insert(n) n.ResetFrequency() @@ -64,18 +64,17 @@ func (p *Policy[K, V]) Add(deleted []node.Node[K, V], n node.Node[K, V]) []node. } for p.isFull() { - deleted = p.evict(deleted) + p.evict() } - - return deleted } -func (p *Policy[K, V]) evict(deleted []node.Node[K, V]) []node.Node[K, V] { +func (p *Policy[K, V]) evict() { if p.small.cost >= p.maxCost/10 { - return p.small.evict(deleted) + p.small.evict() + return } - return p.main.evict(deleted) + p.main.evict() } func (p *Policy[K, V]) isFull() bool { @@ -85,12 +84,12 @@ func (p *Policy[K, V]) isFull() bool { // Delete deletes node from the eviction policy. func (p *Policy[K, V]) Delete(n node.Node[K, V]) { if n.IsSmall() { - p.small.remove(n) + p.small.delete(n) return } if n.IsMain() { - p.main.remove(n) + p.main.delete(n) } } diff --git a/internal/s3fifo/policy_test.go b/internal/s3fifo/policy_test.go index 1a40a21..e27cf6c 100644 --- a/internal/s3fifo/policy_test.go +++ b/internal/s3fifo/policy_test.go @@ -28,15 +28,17 @@ func newNode(k int) node.Node[int, int] { func TestPolicy_ReadAndWrite(t *testing.T) { n := newNode(2) - p := NewPolicy[int, int](10) - p.Add(nil, n) + p := NewPolicy[int, int](10, func(n node.Node[int, int]) { + }) + p.Add(n) if !n.IsSmall() { t.Fatalf("not valid node state: %+v", n) } } func TestPolicy_OneHitWonders(t *testing.T) { - p := NewPolicy[int, int](10) + p := NewPolicy[int, int](10, func(n node.Node[int, int]) { + }) oneHitWonders := make([]node.Node[int, int], 0, 2) for i := 0; i < cap(oneHitWonders); i++ { @@ -49,11 +51,11 @@ func TestPolicy_OneHitWonders(t *testing.T) { } for _, n := range oneHitWonders { - p.Add(nil, n) + p.Add(n) } for _, n := range popular { - p.Add(nil, n) + p.Add(n) } p.Read(oneHitWonders) @@ -67,7 +69,7 @@ func TestPolicy_OneHitWonders(t *testing.T) { } for _, n := range newNodes { - p.Add(nil, n) + p.Add(n) } for _, n := range oneHitWonders { @@ -98,20 +100,28 @@ func TestPolicy_OneHitWonders(t *testing.T) { } func TestPolicy_Update(t *testing.T) { - p := NewPolicy[int, int](100) + collect := false + var deleted []node.Node[int, int] + p := NewPolicy[int, int](100, func(n node.Node[int, int]) { + if collect { + deleted = deleted[:0] + deleted = append(deleted, n) + } + }) n := newNode(1) m := node.NewManager[int, int](node.Config{WithCost: true}) n1 := m.Create(1, 1, 0, n.Cost()+8) - p.Add(nil, n) + p.Add(n) p.Delete(n) - p.Add(nil, n1) + p.Add(n1) p.Read([]node.Node[int, int]{n1, n1}) n2 := m.Create(2, 1, 0, 92) - deleted := p.Add(nil, n2) + collect = true + p.Add(n2) if !n1.IsMain() { t.Fatalf("updated node should be in main queue: %+v", n1) @@ -123,7 +133,7 @@ func TestPolicy_Update(t *testing.T) { n3 := m.Create(1, 1, 0, 109) p.Delete(n1) - deleted = p.Add(nil, n3) + p.Add(n3) if n3.IsSmall() || n3.IsMain() || len(deleted) != 1 || deleted[0] != n3 { t.Fatalf("updated node should be evicted: %+v", n3) } diff --git a/internal/s3fifo/queue.go b/internal/s3fifo/queue.go index 9cac9e5..4f8e76d 100644 --- a/internal/s3fifo/queue.go +++ b/internal/s3fifo/queue.go @@ -39,11 +39,11 @@ func (q *queue[K, V]) pop() node.Node[K, V] { } result := q.head - q.remove(result) + q.delete(result) return result } -func (q *queue[K, V]) remove(n node.Node[K, V]) { +func (q *queue[K, V]) delete(n node.Node[K, V]) { next := n.Next() prev := n.Prev() diff --git a/internal/s3fifo/queue_test.go b/internal/s3fifo/queue_test.go index 481692c..24bfc81 100644 --- a/internal/s3fifo/queue_test.go +++ b/internal/s3fifo/queue_test.go @@ -76,10 +76,10 @@ func TestQueue(t *testing.T) { e := newFakeNode("a") q.push(e) checkQueuePointers(t, q, []node.Node[string, string]{e}) - q.remove(e) + q.delete(e) q.push(e) checkQueuePointers(t, q, []node.Node[string, string]{e}) - q.remove(e) + q.delete(e) checkQueuePointers(t, q, []node.Node[string, string]{}) // Bigger queue @@ -93,11 +93,11 @@ func TestQueue(t *testing.T) { q.push(e4) checkQueuePointers(t, q, []node.Node[string, string]{e1, e2, e3, e4}) - q.remove(e2) + q.delete(e2) checkQueuePointers(t, q, []node.Node[string, string]{e1, e3, e4}) // move from middle - q.remove(e3) + q.delete(e3) q.push(e3) checkQueuePointers(t, q, []node.Node[string, string]{e1, e4, e3}) @@ -108,7 +108,7 @@ func TestQueue(t *testing.T) { checkQueuePointers(t, q, []node.Node[string, string]{e3, e1, e4}) // should be no-op - q.remove(e3) + q.delete(e3) q.push(e3) checkQueuePointers(t, q, []node.Node[string, string]{e1, e4, e3}) @@ -129,7 +129,7 @@ func TestQueue(t *testing.T) { var next node.Node[string, string] for e := q.head; !node.Equals(e, nil); e = next { next = e.Next() - q.remove(e) + q.delete(e) } checkQueuePointers(t, q, []node.Node[string, string]{}) } @@ -143,8 +143,8 @@ func TestQueue_Remove(t *testing.T) { q.push(e2) checkQueuePointers(t, q, []node.Node[int, int]{e1, e2}) e := q.head - q.remove(e) + q.delete(e) checkQueuePointers(t, q, []node.Node[int, int]{e2}) - q.remove(e) + q.delete(e) checkQueuePointers(t, q, []node.Node[int, int]{e2}) } diff --git a/internal/s3fifo/small.go b/internal/s3fifo/small.go index 144bdc2..75e3977 100644 --- a/internal/s3fifo/small.go +++ b/internal/s3fifo/small.go @@ -19,23 +19,26 @@ import ( ) type small[K comparable, V any] struct { - q *queue[K, V] - main *main[K, V] - ghost *ghost[K, V] - cost int - maxCost int + q *queue[K, V] + main *main[K, V] + ghost *ghost[K, V] + cost int + maxCost int + evictNode func(node.Node[K, V]) } func newSmall[K comparable, V any]( maxCost int, main *main[K, V], ghost *ghost[K, V], + evictNode func(node.Node[K, V]), ) *small[K, V] { return &small[K, V]{ - q: newQueue[K, V](), - main: main, - ghost: ghost, - maxCost: maxCost, + q: newQueue[K, V](), + main: main, + ghost: ghost, + maxCost: maxCost, + evictNode: evictNode, } } @@ -45,34 +48,35 @@ func (s *small[K, V]) insert(n node.Node[K, V]) { s.cost += int(n.Cost()) } -func (s *small[K, V]) evict(deleted []node.Node[K, V]) []node.Node[K, V] { +func (s *small[K, V]) evict() { if s.cost == 0 { - return deleted + return } n := s.q.pop() s.cost -= int(n.Cost()) n.Unmark() if !n.IsAlive() || n.HasExpired() { - return append(deleted, n) + s.evictNode(n) + return } if n.Frequency() > 1 { s.main.insert(n) for s.main.isFull() { - deleted = s.main.evict(deleted) + s.main.evict() } n.ResetFrequency() - return deleted + return } - return s.ghost.insert(deleted, n) + s.ghost.insert(n) } -func (s *small[K, V]) remove(n node.Node[K, V]) { +func (s *small[K, V]) delete(n node.Node[K, V]) { s.cost -= int(n.Cost()) n.Unmark() - s.q.remove(n) + s.q.delete(n) } func (s *small[K, V]) length() int {