diff --git a/cmd/generator/main.go b/cmd/generator/main.go index 2103f57..dd7bb2a 100644 --- a/cmd/generator/main.go +++ b/cmd/generator/main.go @@ -134,8 +134,10 @@ func newGenerator(nodeType string) *generator { func (g *generator) printImports() { g.p("import (") g.in() + g.p("\"sync/atomic\"") g.p("\"unsafe\"") if g.features[expiration] { + g.p("") g.p("\"github.com/maypok86/otter/internal/unixtime\"") } g.out() @@ -178,6 +180,7 @@ func (g *generator) printStruct() { g.p("cost uint32") } + g.p("state uint32") g.p("frequency uint8") g.p("queueType uint8") g.out() @@ -199,6 +202,7 @@ func (g *generator) printConstructors() { if g.features[cost] { g.p("cost: cost,") } + g.p("state: aliveState,") g.out() g.p("}") g.out() @@ -365,6 +369,14 @@ func (g *generator) printFunctions() { g.p("}") const otherFunctions = ` +func (n *%s[K, V]) IsAlive() bool { + return atomic.LoadUint32(&n.state) == aliveState +} + +func (n *%s[K, V]) Die() { + atomic.StoreUint32(&n.state, deadState) +} + func (n *%s[K, V]) Frequency() uint8 { return n.frequency } @@ -460,6 +472,11 @@ const ( maxFrequency uint8 = 3 ) +const ( + aliveState uint32 = iota + deadState +) + // Node is a cache entry. type Node[K comparable, V any] interface { // Key returns the key. @@ -490,6 +507,10 @@ type Node[K comparable, V any] interface { Expiration() uint32 // Cost returns the cost of the node. Cost() uint32 + // IsAlive returns true if the entry is available in the hash-table. + IsAlive() bool + // Die sets the node to the dead state. + Die() // Frequency returns the frequency of the node. Frequency() uint8 // IncrementFrequency increments the frequency of the node. @@ -522,11 +543,11 @@ func Equals[K comparable, V any](a, b Node[K, V]) bool { type Config struct { WithExpiration bool - WithCost bool + WithCost bool } type Manager[K comparable, V any] struct { - create func(key K, value V, expiration, cost uint32) Node[K, V] + create func(key K, value V, expiration, cost uint32) Node[K, V] fromPointer func(ptr unsafe.Pointer) Node[K, V] } diff --git a/internal/core/cache.go b/internal/core/cache.go index 766b59d..df36092 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -25,7 +25,6 @@ import ( "github.com/maypok86/otter/internal/queue" "github.com/maypok86/otter/internal/s3fifo" "github.com/maypok86/otter/internal/stats" - "github.com/maypok86/otter/internal/task" "github.com/maypok86/otter/internal/unixtime" "github.com/maypok86/otter/internal/xmath" "github.com/maypok86/otter/internal/xruntime" @@ -68,7 +67,7 @@ type Cache[K comparable, V any] struct { expirePolicy expirePolicy[K, V] stats *stats.Stats readBuffers []*lossy.Buffer[K, V] - writeBuffer *queue.MPSC[task.WriteTask[K, V]] + writeBuffer *queue.MPSC[task[K, V]] evictionMutex sync.Mutex closeOnce sync.Once doneClear chan struct{} @@ -120,7 +119,7 @@ func NewCache[K comparable, V any](c Config[K, V]) *Cache[K, V] { policy: s3fifo.NewPolicy[K, V](uint32(c.Capacity)), expirePolicy: expPolicy, readBuffers: readBuffers, - writeBuffer: queue.NewMPSC[task.WriteTask[K, V]](writeBufferCapacity), + writeBuffer: queue.NewMPSC[task[K, V]](writeBufferCapacity), doneClear: make(chan struct{}), mask: uint32(readBuffersCount - 1), costFunc: c.CostFunc, @@ -159,13 +158,13 @@ func (c *Cache[K, V]) Has(key K) bool { // Get returns the value associated with the key in this cache. func (c *Cache[K, V]) Get(key K) (V, bool) { got, ok := c.hashmap.Get(key) - if !ok { + if !ok || !got.IsAlive() { c.stats.IncMisses() return zeroValue[V](), false } if got.IsExpired() { - c.writeBuffer.Insert(task.NewDeleteTask(got)) + c.writeBuffer.Insert(newDeleteTask(got)) c.stats.IncMisses() return zeroValue[V](), false } @@ -240,7 +239,7 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool) res := c.hashmap.SetIfAbsent(n) if res == nil { // insert - c.writeBuffer.Insert(task.NewAddTask(n)) + c.writeBuffer.Insert(newAddTask(n)) return true } return false @@ -249,10 +248,11 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool) evicted := c.hashmap.Set(n) if evicted != nil { // update - c.writeBuffer.Insert(task.NewUpdateTask(n, evicted)) + evicted.Die() + c.writeBuffer.Insert(newUpdateTask(n, evicted)) } else { // insert - c.writeBuffer.Insert(task.NewAddTask(n)) + c.writeBuffer.Insert(newAddTask(n)) } return true @@ -260,23 +260,24 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool) // Delete removes the association for this key from the cache. func (c *Cache[K, V]) Delete(key K) { - deleted := c.hashmap.Delete(key) - if deleted != nil { - c.writeBuffer.Insert(task.NewDeleteTask(deleted)) - } + c.afterDelete(c.hashmap.Delete(key)) } func (c *Cache[K, V]) deleteNode(n node.Node[K, V]) { - deleted := c.hashmap.DeleteNode(n) + c.afterDelete(c.hashmap.DeleteNode(n)) +} + +func (c *Cache[K, V]) afterDelete(deleted node.Node[K, V]) { if deleted != nil { - c.writeBuffer.Insert(task.NewDeleteTask(deleted)) + deleted.Die() + c.writeBuffer.Insert(newDeleteTask(deleted)) } } // DeleteByFunc removes the association for this key from the cache when the given function returns true. func (c *Cache[K, V]) DeleteByFunc(f func(key K, value V) bool) { c.hashmap.Range(func(n node.Node[K, V]) bool { - if n.IsExpired() { + if !n.IsAlive() || n.IsExpired() { return true } @@ -289,7 +290,8 @@ func (c *Cache[K, V]) DeleteByFunc(f func(key K, value V) bool) { } func (c *Cache[K, V]) cleanup() { - expired := make([]node.Node[K, V], 0, 128) + bufferCapacity := 64 + expired := make([]node.Node[K, V], 0, bufferCapacity) for { time.Sleep(time.Second) @@ -298,41 +300,47 @@ func (c *Cache[K, V]) cleanup() { return } - e := c.expirePolicy.RemoveExpired(expired) - c.policy.Delete(e) + expired = c.expirePolicy.RemoveExpired(expired) + for _, n := range expired { + c.policy.Delete(n) + } c.evictionMutex.Unlock() - for _, n := range e { + for _, n := range expired { c.hashmap.DeleteNode(n) + n.Die() } expired = clearBuffer(expired) + if cap(expired) > 3*bufferCapacity { + expired = make([]node.Node[K, V], 0, bufferCapacity) + } } } func (c *Cache[K, V]) process() { bufferCapacity := 64 - buffer := make([]task.WriteTask[K, V], 0, bufferCapacity) + buffer := make([]task[K, V], 0, bufferCapacity) deleted := make([]node.Node[K, V], 0, bufferCapacity) i := 0 for { t := c.writeBuffer.Remove() - if t.IsClear() || t.IsClose() { + if t.isClear() || t.isClose() { buffer = clearBuffer(buffer) c.writeBuffer.Clear() c.evictionMutex.Lock() c.policy.Clear() c.expirePolicy.Clear() - if t.IsClose() { + if t.isClose() { c.isClosed = true } c.evictionMutex.Unlock() c.doneClear <- struct{}{} - if t.IsClose() { + if t.isClose() { break } continue @@ -346,30 +354,43 @@ func (c *Cache[K, V]) process() { c.evictionMutex.Lock() for _, t := range buffer { + n := t.node() switch { - case t.IsDelete(): - c.expirePolicy.Delete(t.Node()) - case t.IsAdd(): - c.expirePolicy.Add(t.Node()) - case t.IsUpdate(): - c.expirePolicy.Delete(t.OldNode()) - c.expirePolicy.Add(t.Node()) + case t.isDelete(): + c.expirePolicy.Delete(n) + c.policy.Delete(n) + case t.isAdd(): + if n.IsAlive() { + c.expirePolicy.Add(n) + deleted = c.policy.Add(deleted, n) + } + case t.isUpdate(): + oldNode := t.oldNode() + c.expirePolicy.Delete(oldNode) + c.policy.Delete(oldNode) + if n.IsAlive() { + c.expirePolicy.Add(n) + deleted = c.policy.Add(deleted, n) + } } } - d := c.policy.Write(deleted, buffer) - for _, n := range d { + for _, n := range deleted { c.expirePolicy.Delete(n) } c.evictionMutex.Unlock() - for _, n := range d { + for _, n := range deleted { c.hashmap.DeleteNode(n) + n.Die() } buffer = clearBuffer(buffer) deleted = clearBuffer(deleted) + if cap(deleted) > 3*bufferCapacity { + deleted = make([]node.Node[K, V], 0, bufferCapacity) + } } } } @@ -379,7 +400,7 @@ func (c *Cache[K, V]) process() { // Iteration stops early when the given function returns false. func (c *Cache[K, V]) Range(f func(key K, value V) bool) { c.hashmap.Range(func(n node.Node[K, V]) bool { - if n.IsExpired() { + if !n.IsAlive() || n.IsExpired() { return true } @@ -391,10 +412,10 @@ func (c *Cache[K, V]) Range(f func(key K, value V) bool) { // // NOTE: this operation must be performed when no requests are made to the cache otherwise the behavior is undefined. func (c *Cache[K, V]) Clear() { - c.clear(task.NewClearTask[K, V]()) + c.clear(newClearTask[K, V]()) } -func (c *Cache[K, V]) clear(t task.WriteTask[K, V]) { +func (c *Cache[K, V]) clear(t task[K, V]) { c.hashmap.Clear() for i := 0; i < len(c.readBuffers); i++ { c.readBuffers[i].Clear() @@ -411,7 +432,7 @@ func (c *Cache[K, V]) clear(t task.WriteTask[K, V]) { // NOTE: this operation must be performed when no requests are made to the cache otherwise the behavior is undefined. func (c *Cache[K, V]) Close() { c.closeOnce.Do(func() { - c.clear(task.NewCloseTask[K, V]()) + c.clear(newCloseTask[K, V]()) if c.withExpiration { unixtime.Stop() } diff --git a/internal/core/task.go b/internal/core/task.go new file mode 100644 index 0000000..a6e26ee --- /dev/null +++ b/internal/core/task.go @@ -0,0 +1,112 @@ +// Copyright (c) 2023 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "github.com/maypok86/otter/internal/generated/node" +) + +// reason represents the reason for writing the item to the cache. +type reason uint8 + +const ( + addReason reason = iota + 1 + deleteReason + updateReason + clearReason + closeReason +) + +// task is a set of information to update the cache: +// node, reason for write, difference after node cost change, etc. +type task[K comparable, V any] struct { + n node.Node[K, V] + old node.Node[K, V] + writeReason reason +} + +// newAddTask creates a task to add a node to policies. +func newAddTask[K comparable, V any](n node.Node[K, V]) task[K, V] { + return task[K, V]{ + n: n, + writeReason: addReason, + } +} + +// newDeleteTask creates a task to delete a node from policies. +func newDeleteTask[K comparable, V any](n node.Node[K, V]) task[K, V] { + return task[K, V]{ + n: n, + writeReason: deleteReason, + } +} + +// newUpdateTask creates a task to update the node in the policies. +func newUpdateTask[K comparable, V any](n, oldNode node.Node[K, V]) task[K, V] { + return task[K, V]{ + n: n, + old: oldNode, + writeReason: updateReason, + } +} + +// newClearTask creates a task to clear policies. +func newClearTask[K comparable, V any]() task[K, V] { + return task[K, V]{ + writeReason: clearReason, + } +} + +// newCloseTask creates a task to clear policies and stop all goroutines. +func newCloseTask[K comparable, V any]() task[K, V] { + return task[K, V]{ + writeReason: closeReason, + } +} + +// node returns the node contained in the task. If node was not specified, it returns nil. +func (t *task[K, V]) node() node.Node[K, V] { + return t.n +} + +// oldNode returns the old node contained in the task. If old node was not specified, it returns nil. +func (t *task[K, V]) oldNode() node.Node[K, V] { + return t.old +} + +// isAdd returns true if this is an add task. +func (t *task[K, V]) isAdd() bool { + return t.writeReason == addReason +} + +// isDelete returns true if this is a delete task. +func (t *task[K, V]) isDelete() bool { + return t.writeReason == deleteReason +} + +// isUpdate returns true if this is an update task. +func (t *task[K, V]) isUpdate() bool { + return t.writeReason == updateReason +} + +// isClear returns true if this is a clear task. +func (t *task[K, V]) isClear() bool { + return t.writeReason == clearReason +} + +// isClose returns true if this is a close task. +func (t *task[K, V]) isClose() bool { + return t.writeReason == closeReason +} diff --git a/internal/task/task_test.go b/internal/core/task_test.go similarity index 70% rename from internal/task/task_test.go rename to internal/core/task_test.go index d603d5d..a71e8a1 100644 --- a/internal/task/task_test.go +++ b/internal/core/task_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package task +package core import ( "testing" @@ -28,28 +28,28 @@ func TestTask(t *testing.T) { n := nm.Create(1, 2, 6, 4) oldNode := nm.Create(1, 3, 8, 6) - addTask := NewAddTask(n) - if addTask.Node() != n || !addTask.IsAdd() { + addTask := newAddTask(n) + if addTask.node() != n || !addTask.isAdd() { t.Fatalf("not valid add task %+v", addTask) } - deleteTask := NewDeleteTask(n) - if deleteTask.Node() != n || !deleteTask.IsDelete() { + deleteTask := newDeleteTask(n) + if deleteTask.node() != n || !deleteTask.isDelete() { t.Fatalf("not valid delete task %+v", deleteTask) } - updateTask := NewUpdateTask(n, oldNode) - if updateTask.Node() != n || !updateTask.IsUpdate() || updateTask.OldNode() != oldNode { + updateTask := newUpdateTask(n, oldNode) + if updateTask.node() != n || !updateTask.isUpdate() || updateTask.oldNode() != oldNode { t.Fatalf("not valid update task %+v", updateTask) } - clearTask := NewClearTask[int, int]() - if clearTask.Node() != nil || !clearTask.IsClear() { + clearTask := newClearTask[int, int]() + if clearTask.node() != nil || !clearTask.isClear() { t.Fatalf("not valid clear task %+v", clearTask) } - closeTask := NewCloseTask[int, int]() - if closeTask.Node() != nil || !closeTask.IsClose() { + closeTask := newCloseTask[int, int]() + if closeTask.node() != nil || !closeTask.isClose() { t.Fatalf("not valid close task %+v", closeTask) } } diff --git a/internal/generated/node/b.go b/internal/generated/node/b.go index 35a88f5..28190aa 100644 --- a/internal/generated/node/b.go +++ b/internal/generated/node/b.go @@ -4,6 +4,7 @@ package node import ( + "sync/atomic" "unsafe" ) @@ -15,6 +16,7 @@ type B[K comparable, V any] struct { value V prev *B[K, V] next *B[K, V] + state uint32 frequency uint8 queueType uint8 } @@ -24,6 +26,7 @@ func NewB[K comparable, V any](key K, value V, expiration, cost uint32) Node[K, return &B[K, V]{ key: key, value: value, + state: aliveState, } } @@ -96,6 +99,14 @@ func (n *B[K, V]) Cost() uint32 { return 1 } +func (n *B[K, V]) IsAlive() bool { + return atomic.LoadUint32(&n.state) == aliveState +} + +func (n *B[K, V]) Die() { + atomic.StoreUint32(&n.state, deadState) +} + func (n *B[K, V]) Frequency() uint8 { return n.frequency } diff --git a/internal/generated/node/bc.go b/internal/generated/node/bc.go index 4c7e723..7342597 100644 --- a/internal/generated/node/bc.go +++ b/internal/generated/node/bc.go @@ -4,6 +4,7 @@ package node import ( + "sync/atomic" "unsafe" ) @@ -18,6 +19,7 @@ type BC[K comparable, V any] struct { prev *BC[K, V] next *BC[K, V] cost uint32 + state uint32 frequency uint8 queueType uint8 } @@ -28,6 +30,7 @@ func NewBC[K comparable, V any](key K, value V, expiration, cost uint32) Node[K, key: key, value: value, cost: cost, + state: aliveState, } } @@ -100,6 +103,14 @@ func (n *BC[K, V]) Cost() uint32 { return n.cost } +func (n *BC[K, V]) IsAlive() bool { + return atomic.LoadUint32(&n.state) == aliveState +} + +func (n *BC[K, V]) Die() { + atomic.StoreUint32(&n.state, deadState) +} + func (n *BC[K, V]) Frequency() uint8 { return n.frequency } diff --git a/internal/generated/node/be.go b/internal/generated/node/be.go index 2f10b8c..15377d9 100644 --- a/internal/generated/node/be.go +++ b/internal/generated/node/be.go @@ -4,8 +4,10 @@ package node import ( - "github.com/maypok86/otter/internal/unixtime" + "sync/atomic" "unsafe" + + "github.com/maypok86/otter/internal/unixtime" ) // BE is a cache entry that provide the following features: @@ -21,6 +23,7 @@ type BE[K comparable, V any] struct { prevExp *BE[K, V] nextExp *BE[K, V] expiration uint32 + state uint32 frequency uint8 queueType uint8 } @@ -31,6 +34,7 @@ func NewBE[K comparable, V any](key K, value V, expiration, cost uint32) Node[K, key: key, value: value, expiration: expiration, + state: aliveState, } } @@ -111,6 +115,14 @@ func (n *BE[K, V]) Cost() uint32 { return 1 } +func (n *BE[K, V]) IsAlive() bool { + return atomic.LoadUint32(&n.state) == aliveState +} + +func (n *BE[K, V]) Die() { + atomic.StoreUint32(&n.state, deadState) +} + func (n *BE[K, V]) Frequency() uint8 { return n.frequency } diff --git a/internal/generated/node/bec.go b/internal/generated/node/bec.go index bb96da3..e438d13 100644 --- a/internal/generated/node/bec.go +++ b/internal/generated/node/bec.go @@ -4,8 +4,10 @@ package node import ( - "github.com/maypok86/otter/internal/unixtime" + "sync/atomic" "unsafe" + + "github.com/maypok86/otter/internal/unixtime" ) // BEC is a cache entry that provide the following features: @@ -24,6 +26,7 @@ type BEC[K comparable, V any] struct { nextExp *BEC[K, V] expiration uint32 cost uint32 + state uint32 frequency uint8 queueType uint8 } @@ -35,6 +38,7 @@ func NewBEC[K comparable, V any](key K, value V, expiration, cost uint32) Node[K value: value, expiration: expiration, cost: cost, + state: aliveState, } } @@ -115,6 +119,14 @@ func (n *BEC[K, V]) Cost() uint32 { return n.cost } +func (n *BEC[K, V]) IsAlive() bool { + return atomic.LoadUint32(&n.state) == aliveState +} + +func (n *BEC[K, V]) Die() { + atomic.StoreUint32(&n.state, deadState) +} + func (n *BEC[K, V]) Frequency() uint8 { return n.frequency } diff --git a/internal/generated/node/manager.go b/internal/generated/node/manager.go index 21f41d2..8ea396e 100644 --- a/internal/generated/node/manager.go +++ b/internal/generated/node/manager.go @@ -16,6 +16,11 @@ const ( maxFrequency uint8 = 3 ) +const ( + aliveState uint32 = iota + deadState +) + // Node is a cache entry. type Node[K comparable, V any] interface { // Key returns the key. @@ -46,6 +51,10 @@ type Node[K comparable, V any] interface { Expiration() uint32 // Cost returns the cost of the node. Cost() uint32 + // IsAlive returns true if the entry is available in the hash-table. + IsAlive() bool + // Die sets the node to the dead state. + Die() // Frequency returns the frequency of the node. Frequency() uint8 // IncrementFrequency increments the frequency of the node. diff --git a/internal/s3fifo/main.go b/internal/s3fifo/main.go index 66cf8b8..d03bf80 100644 --- a/internal/s3fifo/main.go +++ b/internal/s3fifo/main.go @@ -44,7 +44,7 @@ func (m *main[K, V]) evict(deleted []node.Node[K, V]) []node.Node[K, V] { for m.cost > 0 { n := m.q.pop() - if n.IsExpired() || n.Frequency() == 0 { + if !n.IsAlive() || n.IsExpired() || n.Frequency() == 0 { n.Unmark() m.cost -= n.Cost() return append(deleted, n) diff --git a/internal/s3fifo/policy.go b/internal/s3fifo/policy.go index 4888c65..a0473ea 100644 --- a/internal/s3fifo/policy.go +++ b/internal/s3fifo/policy.go @@ -16,7 +16,6 @@ package s3fifo import ( "github.com/maypok86/otter/internal/generated/node" - "github.com/maypok86/otter/internal/task" ) // Policy is an eviction policy based on S3-FIFO eviction algorithm @@ -55,7 +54,8 @@ func (p *Policy[K, V]) Read(nodes []node.Node[K, V]) { } } -func (p *Policy[K, V]) insert(deleted []node.Node[K, V], n node.Node[K, V]) []node.Node[K, V] { +// Add adds node to the eviction policy. +func (p *Policy[K, V]) Add(deleted []node.Node[K, V], n node.Node[K, V]) []node.Node[K, V] { if p.ghost.isGhost(n) { p.main.insert(n) n.ResetFrequency() @@ -82,40 +82,8 @@ func (p *Policy[K, V]) isFull() bool { return p.small.cost+p.main.cost > p.maxCost } -// Write updates the eviction policy based on node updates. -func (p *Policy[K, V]) Write( - deleted []node.Node[K, V], - tasks []task.WriteTask[K, V], -) []node.Node[K, V] { - for _, t := range tasks { - n := t.Node() - - // already deleted in map - if t.IsDelete() { - p.delete(t.Node()) - continue - } - - if t.IsUpdate() { - // delete old node - p.delete(t.OldNode()) - // insert new node - } - - // add - deleted = p.insert(deleted, n) - } - return deleted -} - -// Delete deletes nodes from the eviction policy. -func (p *Policy[K, V]) Delete(buffer []node.Node[K, V]) { - for _, n := range buffer { - p.delete(n) - } -} - -func (p *Policy[K, V]) delete(n node.Node[K, V]) { +// Delete deletes node from the eviction policy. +func (p *Policy[K, V]) Delete(n node.Node[K, V]) { if n.IsSmall() { p.small.remove(n) return diff --git a/internal/s3fifo/policy_test.go b/internal/s3fifo/policy_test.go index 334cf84..1a40a21 100644 --- a/internal/s3fifo/policy_test.go +++ b/internal/s3fifo/policy_test.go @@ -18,7 +18,6 @@ import ( "testing" "github.com/maypok86/otter/internal/generated/node" - "github.com/maypok86/otter/internal/task" ) func newNode(k int) node.Node[int, int] { @@ -27,26 +26,10 @@ func newNode(k int) node.Node[int, int] { return n } -func nodesToAddTasks(nodes []node.Node[int, int]) []task.WriteTask[int, int] { - tasks := make([]task.WriteTask[int, int], 0, len(nodes)) - for _, n := range nodes { - tasks = append(tasks, task.NewAddTask(n)) - } - return tasks -} - -func nodesToDeleteTasks(nodes []node.Node[int, int]) []task.WriteTask[int, int] { - tasks := make([]task.WriteTask[int, int], 0, len(nodes)) - for _, n := range nodes { - tasks = append(tasks, task.NewDeleteTask(n)) - } - return tasks -} - func TestPolicy_ReadAndWrite(t *testing.T) { n := newNode(2) p := NewPolicy[int, int](10) - p.Write(nil, []task.WriteTask[int, int]{task.NewAddTask(n)}) + p.Add(nil, n) if !n.IsSmall() { t.Fatalf("not valid node state: %+v", n) } @@ -65,8 +48,13 @@ func TestPolicy_OneHitWonders(t *testing.T) { popular = append(popular, newNode(i+3)) } - p.Write(nil, nodesToAddTasks(oneHitWonders)) - p.Write(nil, nodesToAddTasks(popular)) + for _, n := range oneHitWonders { + p.Add(nil, n) + } + + for _, n := range popular { + p.Add(nil, n) + } p.Read(oneHitWonders) for i := 0; i < 3; i++ { @@ -77,7 +65,10 @@ func TestPolicy_OneHitWonders(t *testing.T) { for i := 0; i < cap(newNodes); i++ { newNodes = append(newNodes, newNode(i+12)) } - p.Write(nil, nodesToAddTasks(newNodes)) + + for _, n := range newNodes { + p.Add(nil, n) + } for _, n := range oneHitWonders { if n.IsSmall() || n.IsMain() { @@ -91,9 +82,15 @@ func TestPolicy_OneHitWonders(t *testing.T) { } } - p.Write(nil, nodesToDeleteTasks(oneHitWonders)) - p.Delete(popular) - p.Delete(newNodes) + for _, n := range oneHitWonders { + p.Delete(n) + } + for _, n := range popular { + p.Delete(n) + } + for _, n := range newNodes { + p.Delete(n) + } if p.small.cost+p.main.cost != 0 { t.Fatalf("queues should be empty, but small size: %d, main size: %d", p.small.cost, p.main.cost) @@ -107,17 +104,14 @@ func TestPolicy_Update(t *testing.T) { m := node.NewManager[int, int](node.Config{WithCost: true}) n1 := m.Create(1, 1, 0, n.Cost()+8) - p.Write(nil, []task.WriteTask[int, int]{ - task.NewAddTask(n), - task.NewUpdateTask(n1, n), - }) + p.Add(nil, n) + p.Delete(n) + p.Add(nil, n1) p.Read([]node.Node[int, int]{n1, n1}) n2 := m.Create(2, 1, 0, 92) - deleted := p.Write(nil, []task.WriteTask[int, int]{ - task.NewAddTask(n2), - }) + deleted := p.Add(nil, n2) if !n1.IsMain() { t.Fatalf("updated node should be in main queue: %+v", n1) @@ -128,9 +122,8 @@ func TestPolicy_Update(t *testing.T) { } n3 := m.Create(1, 1, 0, 109) - deleted = p.Write(nil, []task.WriteTask[int, int]{ - task.NewUpdateTask(n3, n1), - }) + p.Delete(n1) + deleted = p.Add(nil, n3) if n3.IsSmall() || n3.IsMain() || len(deleted) != 1 || deleted[0] != n3 { t.Fatalf("updated node should be evicted: %+v", n3) } diff --git a/internal/s3fifo/small.go b/internal/s3fifo/small.go index 1a617ec..cb84528 100644 --- a/internal/s3fifo/small.go +++ b/internal/s3fifo/small.go @@ -53,7 +53,7 @@ func (s *small[K, V]) evict(deleted []node.Node[K, V]) []node.Node[K, V] { n := s.q.pop() s.cost -= n.Cost() n.Unmark() - if n.IsExpired() { + if !n.IsAlive() || n.IsExpired() { return append(deleted, n) } diff --git a/internal/task/task.go b/internal/task/task.go deleted file mode 100644 index e85f070..0000000 --- a/internal/task/task.go +++ /dev/null @@ -1,112 +0,0 @@ -// Copyright (c) 2023 Alexey Mayshev. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package task - -import ( - "github.com/maypok86/otter/internal/generated/node" -) - -// reason represents the reason for writing the item to the cache. -type reason uint8 - -const ( - addReason reason = iota + 1 - deleteReason - updateReason - clearReason - closeReason -) - -// WriteTask is a set of information to update the cache: -// node, reason for write, difference after node cost change, etc. -type WriteTask[K comparable, V any] struct { - n node.Node[K, V] - oldNode node.Node[K, V] - writeReason reason -} - -// NewAddTask creates a task to add a node to policies. -func NewAddTask[K comparable, V any](n node.Node[K, V]) WriteTask[K, V] { - return WriteTask[K, V]{ - n: n, - writeReason: addReason, - } -} - -// NewDeleteTask creates a task to delete a node from policies. -func NewDeleteTask[K comparable, V any](n node.Node[K, V]) WriteTask[K, V] { - return WriteTask[K, V]{ - n: n, - writeReason: deleteReason, - } -} - -// NewUpdateTask creates a task to update the node in the policies. -func NewUpdateTask[K comparable, V any](n, oldNode node.Node[K, V]) WriteTask[K, V] { - return WriteTask[K, V]{ - n: n, - oldNode: oldNode, - writeReason: updateReason, - } -} - -// NewClearTask creates a task to clear policies. -func NewClearTask[K comparable, V any]() WriteTask[K, V] { - return WriteTask[K, V]{ - writeReason: clearReason, - } -} - -// NewCloseTask creates a task to clear policies and stop all goroutines. -func NewCloseTask[K comparable, V any]() WriteTask[K, V] { - return WriteTask[K, V]{ - writeReason: closeReason, - } -} - -// Node returns the node contained in the task. If node was not specified, it returns nil. -func (t *WriteTask[K, V]) Node() node.Node[K, V] { - return t.n -} - -// OldNode returns the old node contained in the task. If old node was not specified, it returns nil. -func (t *WriteTask[K, V]) OldNode() node.Node[K, V] { - return t.oldNode -} - -// IsAdd returns true if this is an add task. -func (t *WriteTask[K, V]) IsAdd() bool { - return t.writeReason == addReason -} - -// IsDelete returns true if this is a delete task. -func (t *WriteTask[K, V]) IsDelete() bool { - return t.writeReason == deleteReason -} - -// IsUpdate returns true if this is an update task. -func (t *WriteTask[K, V]) IsUpdate() bool { - return t.writeReason == updateReason -} - -// IsClear returns true if this is a clear task. -func (t *WriteTask[K, V]) IsClear() bool { - return t.writeReason == clearReason -} - -// IsClose returns true if this is a close task. -func (t *WriteTask[K, V]) IsClose() bool { - return t.writeReason == closeReason -}