diff --git a/.gitignore b/.gitignore index 4956abb..e95bc5e 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ # vendor/ /.idea/ +*.tmp *coverage.txt *lint.txt **/bin/ diff --git a/Makefile b/Makefile index e434b76..bb75bff 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,9 @@ test: test.unit ## Run all the tests .PHONY: test.unit test.unit: ## Run all unit tests @echo 'mode: atomic' > coverage.txt - go test -covermode=atomic -coverprofile=coverage.txt -coverpkg=./... -v -race ./... + go test -covermode=atomic -coverprofile=coverage.txt.tmp -coverpkg=./... -v -race ./... + cat coverage.txt.tmp | grep -v "/generated/" > coverage.txt + rm coverage.txt.tmp .PHONY: cover cover: test.unit ## Run all the tests and opens the coverage report diff --git a/builder.go b/builder.go index aba7b6a..dcbb638 100644 --- a/builder.go +++ b/builder.go @@ -40,6 +40,7 @@ type baseOptions[K comparable, V any] struct { capacity int initialCapacity int statsEnabled bool + withCost bool costFunc func(key K, value V) uint32 } @@ -49,6 +50,7 @@ func (o *baseOptions[K, V]) collectStats() { func (o *baseOptions[K, V]) setCostFunc(costFunc func(key K, value V) uint32) { o.costFunc = costFunc + o.withCost = true } func (o *baseOptions[K, V]) setInitialCapacity(initialCapacity int) { @@ -75,6 +77,7 @@ func (o *baseOptions[K, V]) toConfig() core.Config[K, V] { InitialCapacity: initialCapacity, StatsEnabled: o.statsEnabled, CostFunc: o.costFunc, + WithCost: o.withCost, } } diff --git a/cmd/generator/main.go b/cmd/generator/main.go index 2672f77..2103f57 100644 --- a/cmd/generator/main.go +++ b/cmd/generator/main.go @@ -245,6 +245,12 @@ func (g *generator) printFunctions() { g.p("func (n *%s[K, V]) SetPrev(v Node[K, V]) {", g.structName) g.in() + g.p("if v == nil {") + g.in() + g.p("n.prev = nil") + g.p("return") + g.out() + g.p("}") g.p("n.prev = (*%s[K, V])(v.AsPointer())", g.structName) g.out() g.p("}") @@ -259,6 +265,12 @@ func (g *generator) printFunctions() { g.p("func (n *%s[K, V]) SetNext(v Node[K, V]) {", g.structName) g.in() + g.p("if v == nil {") + g.in() + g.p("n.next = nil") + g.p("return") + g.out() + g.p("}") g.p("n.next = (*%s[K, V])(v.AsPointer())", g.structName) g.out() g.p("}") @@ -278,6 +290,12 @@ func (g *generator) printFunctions() { g.p("func (n *%s[K, V]) SetPrevExp(v Node[K, V]) {", g.structName) g.in() if g.features[expiration] { + g.p("if v == nil {") + g.in() + g.p("n.prevExp = nil") + g.p("return") + g.out() + g.p("}") g.p("n.prevExp = (*%s[K, V])(v.AsPointer())", g.structName) } else { g.p("panic(\"not implemented\")") @@ -300,6 +318,12 @@ func (g *generator) printFunctions() { g.p("func (n *%s[K, V]) SetNextExp(v Node[K, V]) {", g.structName) g.in() if g.features[expiration] { + g.p("if v == nil {") + g.in() + g.p("n.nextExp = nil") + g.p("return") + g.out() + g.p("}") g.p("n.nextExp = (*%s[K, V])(v.AsPointer())", g.structName) } else { g.p("panic(\"not implemented\")") @@ -486,6 +510,16 @@ type Node[K comparable, V any] interface { Unmark() } +func Equals[K comparable, V any](a, b Node[K, V]) bool { + if a == nil { + return b == nil || b.AsPointer() == nil + } + if b == nil { + return a.AsPointer() == nil + } + return a.AsPointer() == b.AsPointer() +} + type Config struct { WithExpiration bool WithCost bool diff --git a/internal/core/cache.go b/internal/core/cache.go index cf5cff6..766b59d 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -19,12 +19,13 @@ import ( "time" "github.com/maypok86/otter/internal/expire" + "github.com/maypok86/otter/internal/generated/node" "github.com/maypok86/otter/internal/hashtable" "github.com/maypok86/otter/internal/lossy" - "github.com/maypok86/otter/internal/node" "github.com/maypok86/otter/internal/queue" "github.com/maypok86/otter/internal/s3fifo" "github.com/maypok86/otter/internal/stats" + "github.com/maypok86/otter/internal/task" "github.com/maypok86/otter/internal/unixtime" "github.com/maypok86/otter/internal/xmath" "github.com/maypok86/otter/internal/xruntime" @@ -48,17 +49,26 @@ type Config[K comparable, V any] struct { TTL *time.Duration WithVariableTTL bool CostFunc func(key K, value V) uint32 + WithCost bool +} + +type expirePolicy[K comparable, V any] interface { + Add(n node.Node[K, V]) + Delete(n node.Node[K, V]) + RemoveExpired(expired []node.Node[K, V]) []node.Node[K, V] + Clear() } // Cache is a structure performs a best-effort bounding of a hash table using eviction algorithm // to determine which entries to evict when the capacity is exceeded. type Cache[K comparable, V any] struct { + nodeManager *node.Manager[K, V] hashmap *hashtable.Map[K, V] policy *s3fifo.Policy[K, V] - expirePolicy *expire.Policy[K, V] + expirePolicy expirePolicy[K, V] stats *stats.Stats - readBuffers []*lossy.Buffer[node.Node[K, V]] - writeBuffer *queue.MPSC[node.WriteTask[K, V]] + readBuffers []*lossy.Buffer[K, V] + writeBuffer *queue.MPSC[task.WriteTask[K, V]] evictionMutex sync.Mutex closeOnce sync.Once doneClear chan struct{} @@ -77,30 +87,46 @@ func NewCache[K comparable, V any](c Config[K, V]) *Cache[K, V] { writeBufferCapacity := 128 * roundedParallelism readBuffersCount := 4 * roundedParallelism - readBuffers := make([]*lossy.Buffer[node.Node[K, V]], 0, readBuffersCount) + nodeManager := node.NewManager[K, V](node.Config{ + WithExpiration: c.TTL != nil || c.WithVariableTTL, + WithCost: c.WithCost, + }) + + readBuffers := make([]*lossy.Buffer[K, V], 0, readBuffersCount) for i := 0; i < readBuffersCount; i++ { - readBuffers = append(readBuffers, lossy.New[node.Node[K, V]]()) + readBuffers = append(readBuffers, lossy.New[K, V](nodeManager)) } var hashmap *hashtable.Map[K, V] if c.InitialCapacity == nil { - hashmap = hashtable.New[K, V]() + hashmap = hashtable.New[K, V](nodeManager) } else { - hashmap = hashtable.NewWithSize[K, V](*c.InitialCapacity) + hashmap = hashtable.NewWithSize[K, V](nodeManager, *c.InitialCapacity) + } + + var expPolicy expirePolicy[K, V] + switch { + case c.TTL != nil: + expPolicy = expire.NewFixed[K, V]() + case c.WithVariableTTL: + expPolicy = expire.NewVariable[K, V](nodeManager) + default: + expPolicy = expire.NewDisabled[K, V]() } cache := &Cache[K, V]{ - hashmap: hashmap, - policy: s3fifo.NewPolicy[K, V](uint32(c.Capacity)), - readBuffers: readBuffers, - writeBuffer: queue.NewMPSC[node.WriteTask[K, V]](writeBufferCapacity), - doneClear: make(chan struct{}), - mask: uint32(readBuffersCount - 1), - costFunc: c.CostFunc, - capacity: c.Capacity, + nodeManager: nodeManager, + hashmap: hashmap, + policy: s3fifo.NewPolicy[K, V](uint32(c.Capacity)), + expirePolicy: expPolicy, + readBuffers: readBuffers, + writeBuffer: queue.NewMPSC[task.WriteTask[K, V]](writeBufferCapacity), + doneClear: make(chan struct{}), + mask: uint32(readBuffersCount - 1), + costFunc: c.CostFunc, + capacity: c.Capacity, } - cache.expirePolicy = expire.NewPolicy[K, V]() if c.StatsEnabled { cache.stats = stats.New() } @@ -139,7 +165,7 @@ func (c *Cache[K, V]) Get(key K) (V, bool) { } if got.IsExpired() { - c.writeBuffer.Insert(node.NewDeleteTask(got)) + c.writeBuffer.Insert(task.NewDeleteTask(got)) c.stats.IncMisses() return zeroValue[V](), false } @@ -150,7 +176,7 @@ func (c *Cache[K, V]) Get(key K) (V, bool) { return got.Value(), ok } -func (c *Cache[K, V]) afterGet(got *node.Node[K, V]) { +func (c *Cache[K, V]) afterGet(got node.Node[K, V]) { idx := c.getReadBufferIdx() pb := c.readBuffers[idx].Add(got) if pb != nil { @@ -209,12 +235,12 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool) return false } - n := node.New(key, value, expiration, cost) + n := c.nodeManager.Create(key, value, expiration, cost) if onlyIfAbsent { res := c.hashmap.SetIfAbsent(n) if res == nil { // insert - c.writeBuffer.Insert(node.NewAddTask(n)) + c.writeBuffer.Insert(task.NewAddTask(n)) return true } return false @@ -223,10 +249,10 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool) evicted := c.hashmap.Set(n) if evicted != nil { // update - c.writeBuffer.Insert(node.NewUpdateTask(n, evicted)) + c.writeBuffer.Insert(task.NewUpdateTask(n, evicted)) } else { // insert - c.writeBuffer.Insert(node.NewAddTask(n)) + c.writeBuffer.Insert(task.NewAddTask(n)) } return true @@ -236,20 +262,20 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool) func (c *Cache[K, V]) Delete(key K) { deleted := c.hashmap.Delete(key) if deleted != nil { - c.writeBuffer.Insert(node.NewDeleteTask(deleted)) + c.writeBuffer.Insert(task.NewDeleteTask(deleted)) } } -func (c *Cache[K, V]) deleteNode(n *node.Node[K, V]) { +func (c *Cache[K, V]) deleteNode(n node.Node[K, V]) { deleted := c.hashmap.DeleteNode(n) if deleted != nil { - c.writeBuffer.Insert(node.NewDeleteTask(deleted)) + c.writeBuffer.Insert(task.NewDeleteTask(deleted)) } } // DeleteByFunc removes the association for this key from the cache when the given function returns true. func (c *Cache[K, V]) DeleteByFunc(f func(key K, value V) bool) { - c.hashmap.Range(func(n *node.Node[K, V]) bool { + c.hashmap.Range(func(n node.Node[K, V]) bool { if n.IsExpired() { return true } @@ -263,7 +289,7 @@ func (c *Cache[K, V]) DeleteByFunc(f func(key K, value V) bool) { } func (c *Cache[K, V]) cleanup() { - expired := make([]*node.Node[K, V], 0, 128) + expired := make([]node.Node[K, V], 0, 128) for { time.Sleep(time.Second) @@ -287,32 +313,32 @@ func (c *Cache[K, V]) cleanup() { func (c *Cache[K, V]) process() { bufferCapacity := 64 - buffer := make([]node.WriteTask[K, V], 0, bufferCapacity) - deleted := make([]*node.Node[K, V], 0, bufferCapacity) + buffer := make([]task.WriteTask[K, V], 0, bufferCapacity) + deleted := make([]node.Node[K, V], 0, bufferCapacity) i := 0 for { - task := c.writeBuffer.Remove() + t := c.writeBuffer.Remove() - if task.IsClear() || task.IsClose() { + if t.IsClear() || t.IsClose() { buffer = clearBuffer(buffer) c.writeBuffer.Clear() c.evictionMutex.Lock() c.policy.Clear() c.expirePolicy.Clear() - if task.IsClose() { + if t.IsClose() { c.isClosed = true } c.evictionMutex.Unlock() c.doneClear <- struct{}{} - if task.IsClose() { + if t.IsClose() { break } continue } - buffer = append(buffer, task) + buffer = append(buffer, t) i++ if i >= bufferCapacity { i -= bufferCapacity @@ -352,7 +378,7 @@ func (c *Cache[K, V]) process() { // // Iteration stops early when the given function returns false. func (c *Cache[K, V]) Range(f func(key K, value V) bool) { - c.hashmap.Range(func(n *node.Node[K, V]) bool { + c.hashmap.Range(func(n node.Node[K, V]) bool { if n.IsExpired() { return true } @@ -365,16 +391,16 @@ func (c *Cache[K, V]) Range(f func(key K, value V) bool) { // // NOTE: this operation must be performed when no requests are made to the cache otherwise the behavior is undefined. func (c *Cache[K, V]) Clear() { - c.clear(node.NewClearTask[K, V]()) + c.clear(task.NewClearTask[K, V]()) } -func (c *Cache[K, V]) clear(task node.WriteTask[K, V]) { +func (c *Cache[K, V]) clear(t task.WriteTask[K, V]) { c.hashmap.Clear() for i := 0; i < len(c.readBuffers); i++ { c.readBuffers[i].Clear() } - c.writeBuffer.Insert(task) + c.writeBuffer.Insert(t) <-c.doneClear c.stats.Clear() @@ -385,7 +411,7 @@ func (c *Cache[K, V]) clear(task node.WriteTask[K, V]) { // NOTE: this operation must be performed when no requests are made to the cache otherwise the behavior is undefined. func (c *Cache[K, V]) Close() { c.closeOnce.Do(func() { - c.clear(node.NewCloseTask[K, V]()) + c.clear(task.NewCloseTask[K, V]()) if c.withExpiration { unixtime.Stop() } diff --git a/internal/core/cache_test.go b/internal/core/cache_test.go index 3f35350..4e8fe61 100644 --- a/internal/core/cache_test.go +++ b/internal/core/cache_test.go @@ -4,7 +4,7 @@ import ( "testing" "time" - "github.com/maypok86/otter/internal/node" + "github.com/maypok86/otter/internal/generated/node" ) func TestCache_SetWithCost(t *testing.T) { @@ -48,8 +48,13 @@ func TestCache_Range(t *testing.T) { time.Sleep(3 * time.Second) + nm := node.NewManager[int, int](node.Config{ + WithExpiration: true, + WithCost: true, + }) + c.Set(1, 1) - c.hashmap.Set(node.New(2, 2, 1, 1)) + c.hashmap.Set(nm.Create(2, 2, 1, 1)) c.Set(3, 3) aliveNodes := 2 iters := 0 diff --git a/internal/expire/disabled.go b/internal/expire/disabled.go new file mode 100644 index 0000000..fb46cd5 --- /dev/null +++ b/internal/expire/disabled.go @@ -0,0 +1,36 @@ +// Copyright (c) 2024 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package expire + +import "github.com/maypok86/otter/internal/generated/node" + +type Disabled[K comparable, V any] struct{} + +func NewDisabled[K comparable, V any]() *Disabled[K, V] { + return &Disabled[K, V]{} +} + +func (d *Disabled[K, V]) Add(n node.Node[K, V]) { +} + +func (d *Disabled[K, V]) Delete(n node.Node[K, V]) { +} + +func (d *Disabled[K, V]) RemoveExpired(expired []node.Node[K, V]) []node.Node[K, V] { + return expired +} + +func (d *Disabled[K, V]) Clear() { +} diff --git a/internal/expire/fixed.go b/internal/expire/fixed.go new file mode 100644 index 0000000..bb0f506 --- /dev/null +++ b/internal/expire/fixed.go @@ -0,0 +1,32 @@ +package expire + +import "github.com/maypok86/otter/internal/generated/node" + +type Fixed[K comparable, V any] struct { + q *queue[K, V] +} + +func NewFixed[K comparable, V any]() *Fixed[K, V] { + return &Fixed[K, V]{ + q: newQueue[K, V](), + } +} + +func (f *Fixed[K, V]) Add(n node.Node[K, V]) { + f.q.push(n) +} + +func (f *Fixed[K, V]) Delete(n node.Node[K, V]) { + f.q.remove(n) +} + +func (f *Fixed[K, V]) RemoveExpired(expired []node.Node[K, V]) []node.Node[K, V] { + for !f.q.isEmpty() && f.q.head.IsExpired() { + expired = append(expired, f.q.pop()) + } + return expired +} + +func (f *Fixed[K, V]) Clear() { + f.q.clear() +} diff --git a/internal/expire/policy.go b/internal/expire/policy.go deleted file mode 100644 index 25d4d28..0000000 --- a/internal/expire/policy.go +++ /dev/null @@ -1,201 +0,0 @@ -// Copyright (c) 2023 Alexey Mayshev. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package expire - -import ( - "github.com/dolthub/swiss" - - "github.com/maypok86/otter/internal/node" - "github.com/maypok86/otter/internal/unixtime" -) - -const ( - numberOfBuckets = 128 - mask = uint32(numberOfBuckets - 1) - base = uint32(5) - - // Eliminate probing settings. - maxProbeCount = 100 - maxFailCount = 3 - - mapSize = 100 -) - -func bucketTimeToBucketID(bucketTime uint32) int { - return int(bucketTime & mask) -} - -func timestampToBucketTime(timestamp uint32) uint32 { - return timestamp / base -} - -func nextBucketID(bucketID int) int { - return (bucketID + 1) & int(mask) -} - -type bucket[K comparable, V any] struct { - m *swiss.Map[*node.Node[K, V], struct{}] - lastTime uint32 -} - -func newBucket[K comparable, V any]() bucket[K, V] { - return bucket[K, V]{ - m: swiss.NewMap[*node.Node[K, V], struct{}](mapSize), - lastTime: 0, - } -} - -func (b *bucket[K, V]) add(n *node.Node[K, V], newTime uint32) { - if b.isEmpty() { - b.m.Put(n, struct{}{}) - b.lastTime = newTime - return - } - - if newTime > b.lastTime { - return - } - - b.m.Put(n, struct{}{}) -} - -func (b *bucket[K, V]) delete(n *node.Node[K, V]) { - b.m.Delete(n) - if b.m.Count() == 0 { - b.lastTime = 0 - } -} - -func (b *bucket[K, V]) isEmpty() bool { - return b.lastTime == 0 -} - -func (b *bucket[K, V]) clear() { - b.m.Clear() - b.lastTime = 0 -} - -// Policy is an expiration policy for arbitrary TTL values, -// using a hybrid algorithm combining a deterministic one using buckets and a randomized one inherited from Redis expiration algorithm. -// https://dl.acm.org/doi/fullHtml/10.1145/3422575.3422797 -type Policy[K comparable, V any] struct { - buckets [numberOfBuckets]bucket[K, V] - expires *swiss.Map[*node.Node[K, V], struct{}] - currentBucketID int -} - -// NewPolicy creates a new Policy with 128 buckets. -func NewPolicy[K comparable, V any]() *Policy[K, V] { - p := &Policy[K, V]{ - expires: swiss.NewMap[*node.Node[K, V], struct{}](mapSize), - } - - for i := 0; i < numberOfBuckets; i++ { - p.buckets[i] = newBucket[K, V]() - } - - return p -} - -// Add adds node.Node to Policy if it has a TTL specified. -func (p *Policy[K, V]) Add(n *node.Node[K, V]) { - expiration := n.Expiration() - if expiration == 0 { - return - } - - bucketTime := timestampToBucketTime(expiration) - bucketID := bucketTimeToBucketID(bucketTime) - - p.expires.Put(n, struct{}{}) - p.buckets[bucketID].add(n, bucketTime) -} - -// Delete removes node.Node from Policy if it has a TTL specified. -func (p *Policy[K, V]) Delete(n *node.Node[K, V]) { - expiration := n.Expiration() - if expiration == 0 { - return - } - - bucketTime := timestampToBucketTime(expiration) - bucketID := bucketTimeToBucketID(bucketTime) - - p.expires.Delete(n) - p.buckets[bucketID].delete(n) -} - -// RemoveExpired removes the expired node.Node from Policy. -// Buckets are checked first, and then a redis randomized algorithm is applied to lazily find the remaining expired nodes. -func (p *Policy[K, V]) RemoveExpired(expired []*node.Node[K, V]) []*node.Node[K, V] { - now := unixtime.Now() - for i := 0; i < numberOfBuckets; i++ { - expired = p.expireBucket(expired, p.currentBucketID, now) - p.currentBucketID = nextBucketID(p.currentBucketID) - } - - return p.probingExpire(expired) -} - -func (p *Policy[K, V]) expireBucket(expired []*node.Node[K, V], bucketID int, now uint32) []*node.Node[K, V] { - b := &p.buckets[bucketID] - if b.lastTime == 0 { - return expired - } - - now = timestampToBucketTime(now) - if now > b.lastTime { - b.m.Iter(func(n *node.Node[K, V], _ struct{}) (stop bool) { - p.expires.Delete(n) - expired = append(expired, n) - return false - }) - b.clear() - } - - return expired -} - -func (p *Policy[K, V]) probingExpire(expired []*node.Node[K, V]) []*node.Node[K, V] { - failCount := 0 - probeCount := 0 - p.expires.Iter(func(n *node.Node[K, V], _ struct{}) (stop bool) { - if n.IsExpired() { - p.expires.Delete(n) - expired = append(expired, n) - failCount = 0 - return false - } - - failCount++ - if failCount > maxFailCount { - return true - } - - probeCount++ - return probeCount > maxProbeCount - }) - - return expired -} - -// Clear completely clears Policy and returns it to its default state. -func (p *Policy[K, V]) Clear() { - p.currentBucketID = 0 - p.expires.Clear() - for i := 0; i < numberOfBuckets; i++ { - p.buckets[i].clear() - } -} diff --git a/internal/expire/queue.go b/internal/expire/queue.go new file mode 100644 index 0000000..dc906b9 --- /dev/null +++ b/internal/expire/queue.go @@ -0,0 +1,89 @@ +// Copyright (c) 2024 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package expire + +import "github.com/maypok86/otter/internal/generated/node" + +type queue[K comparable, V any] struct { + head node.Node[K, V] + tail node.Node[K, V] + len int +} + +func newQueue[K comparable, V any]() *queue[K, V] { + return &queue[K, V]{} +} + +func (q *queue[K, V]) length() int { + return q.len +} + +func (q *queue[K, V]) isEmpty() bool { + return q.length() == 0 +} + +func (q *queue[K, V]) push(n node.Node[K, V]) { + if q.isEmpty() { + q.head = n + q.tail = n + } else { + n.SetPrevExp(q.tail) + q.tail.SetNextExp(n) + q.tail = n + } + + q.len++ +} + +func (q *queue[K, V]) pop() node.Node[K, V] { + if q.isEmpty() { + return nil + } + + result := q.head + q.remove(result) + return result +} + +func (q *queue[K, V]) remove(n node.Node[K, V]) { + next := n.NextExp() + prev := n.PrevExp() + + if node.Equals(prev, nil) { + if node.Equals(next, nil) && !node.Equals(q.head, n) { + return + } + + q.head = next + } else { + prev.SetNextExp(next) + n.SetPrevExp(nil) + } + + if node.Equals(next, nil) { + q.tail = prev + } else { + next.SetPrevExp(prev) + n.SetNextExp(nil) + } + + q.len-- +} + +func (q *queue[K, V]) clear() { + for !q.isEmpty() { + q.pop() + } +} diff --git a/internal/expire/queue_test.go b/internal/expire/queue_test.go new file mode 100644 index 0000000..2b6034f --- /dev/null +++ b/internal/expire/queue_test.go @@ -0,0 +1,150 @@ +// Copyright (c) 2024 Alexey Mayshev. All rights reserved. +// Copyright 2009 The Go Authors. All rights reserved. +// +// Copyright notice. Initial version of the following tests was based on +// the following file from the Go Programming Language core repo: +// https://cs.opensource.google/go/go/+/refs/tags/go1.21.5:src/container/list/list_test.go +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// That can be found at https://cs.opensource.google/go/go/+/refs/tags/go1.21.5:LICENSE + +package expire + +import ( + "strconv" + "testing" + + "github.com/maypok86/otter/internal/generated/node" +) + +func checkQueueLen[K comparable, V any](t *testing.T, q *queue[K, V], length int) bool { + t.Helper() + + if n := q.length(); n != length { + t.Errorf("q.length() = %d, want %d", n, length) + return false + } + return true +} + +func checkQueuePointers[K comparable, V any](t *testing.T, q *queue[K, V], nodes []node.Node[K, V]) { + t.Helper() + + if !checkQueueLen(t, q, len(nodes)) { + return + } + + // zero length queues must be the zero value + if len(nodes) == 0 { + if !(node.Equals(q.head, nil) && node.Equals(q.tail, nil)) { + t.Errorf("q.head = %p, q.tail = %p; both should be nil", q.head, q.tail) + } + return + } + + // check internal and external prev/next connections + for i, n := range nodes { + var prev node.Node[K, V] + if i > 0 { + prev = nodes[i-1] + } + if p := n.PrevExp(); !node.Equals(p, prev) { + t.Errorf("elt[%d](%p).prev = %p, want %p", i, n, p, prev) + } + + var next node.Node[K, V] + if i < len(nodes)-1 { + next = nodes[i+1] + } + if nn := n.NextExp(); !node.Equals(nn, next) { + t.Errorf("nodes[%d](%p).next = %p, want %p", i, n, nn, next) + } + } +} + +func newNode[K comparable](e K) node.Node[K, K] { + m := node.NewManager[K, K](node.Config{WithCost: true, WithExpiration: true}) + return m.Create(e, e, 0, 0) +} + +func TestQueue(t *testing.T) { + q := newQueue[string, string]() + checkQueuePointers(t, q, []node.Node[string, string]{}) + + // Single element queue + e := newNode("a") + q.push(e) + checkQueuePointers(t, q, []node.Node[string, string]{e}) + q.remove(e) + q.push(e) + checkQueuePointers(t, q, []node.Node[string, string]{e}) + q.remove(e) + checkQueuePointers(t, q, []node.Node[string, string]{}) + + // Bigger queue + e2 := newNode("2") + e1 := newNode("1") + e3 := newNode("3") + e4 := newNode("4") + q.push(e1) + q.push(e2) + q.push(e3) + q.push(e4) + checkQueuePointers(t, q, []node.Node[string, string]{e1, e2, e3, e4}) + + q.remove(e2) + checkQueuePointers(t, q, []node.Node[string, string]{e1, e3, e4}) + + // move from middle + q.remove(e3) + q.push(e3) + checkQueuePointers(t, q, []node.Node[string, string]{e1, e4, e3}) + + q.clear() + q.push(e3) + q.push(e1) + q.push(e4) + checkQueuePointers(t, q, []node.Node[string, string]{e3, e1, e4}) + + // should be no-op + q.remove(e3) + q.push(e3) + checkQueuePointers(t, q, []node.Node[string, string]{e1, e4, e3}) + + // Check standard iteration. + sum := 0 + for e := q.head; !node.Equals(e, nil); e = e.NextExp() { + i, err := strconv.Atoi(e.Value()) + if err != nil { + continue + } + sum += i + } + if sum != 8 { + t.Errorf("sum over l = %d, want 8", sum) + } + + // Clear all elements by iterating + var next node.Node[string, string] + for e := q.head; !node.Equals(e, nil); e = next { + next = e.NextExp() + q.remove(e) + } + checkQueuePointers(t, q, []node.Node[string, string]{}) +} + +func TestQueue_Remove(t *testing.T) { + q := newQueue[int, int]() + + e1 := newNode(1) + e2 := newNode(2) + q.push(e1) + q.push(e2) + checkQueuePointers(t, q, []node.Node[int, int]{e1, e2}) + e := q.head + q.remove(e) + checkQueuePointers(t, q, []node.Node[int, int]{e2}) + q.remove(e) + checkQueuePointers(t, q, []node.Node[int, int]{e2}) +} diff --git a/internal/expire/variable.go b/internal/expire/variable.go new file mode 100644 index 0000000..522a318 --- /dev/null +++ b/internal/expire/variable.go @@ -0,0 +1,183 @@ +// Copyright (c) 2024 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package expire + +import ( + "math" + "math/bits" + "time" + + "github.com/maypok86/otter/internal/generated/node" + "github.com/maypok86/otter/internal/unixtime" + "github.com/maypok86/otter/internal/xmath" +) + +var ( + buckets = []uint32{64, 64, 32, 4, 1} + spans = []uint32{ + xmath.RoundUpPowerOf2(uint32((1 * time.Second).Seconds())), // 1s + xmath.RoundUpPowerOf2(uint32((1 * time.Minute).Seconds())), // 1.07m + xmath.RoundUpPowerOf2(uint32((1 * time.Hour).Seconds())), // 1.13h + xmath.RoundUpPowerOf2(uint32((24 * time.Hour).Seconds())), // 1.52d + buckets[3] * xmath.RoundUpPowerOf2(uint32((24 * time.Hour).Seconds())), // 6.07d + buckets[3] * xmath.RoundUpPowerOf2(uint32((24 * time.Hour).Seconds())), // 6.07d + } + shift = []uint32{ + uint32(bits.TrailingZeros32(spans[0])), + uint32(bits.TrailingZeros32(spans[1])), + uint32(bits.TrailingZeros32(spans[2])), + uint32(bits.TrailingZeros32(spans[3])), + uint32(bits.TrailingZeros32(spans[4])), + } +) + +type Variable[K comparable, V any] struct { + wheel [][]node.Node[K, V] + time uint32 +} + +func NewVariable[K comparable, V any](nodeManager *node.Manager[K, V]) *Variable[K, V] { + wheel := make([][]node.Node[K, V], len(buckets)) + for i := 0; i < len(wheel); i++ { + wheel[i] = make([]node.Node[K, V], buckets[i]) + for j := 0; j < len(wheel[i]); j++ { + var k K + var v V + fn := nodeManager.Create(k, v, math.MaxUint32, 1) + fn.SetPrevExp(fn) + fn.SetNextExp(fn) + wheel[i][j] = fn + } + } + return &Variable[K, V]{ + wheel: wheel, + } +} + +// findBucket determines the bucket that the timer event should be added to. +func (v *Variable[K, V]) findBucket(expiration uint32) node.Node[K, V] { + duration := expiration - v.time + length := len(v.wheel) - 1 + for i := 0; i < length; i++ { + if duration < spans[i+1] { + ticks := expiration >> shift[i] + index := ticks & (buckets[i] - 1) + return v.wheel[i][index] + } + } + return v.wheel[length][0] +} + +// Add schedules a timer event for the node. +func (v *Variable[K, V]) Add(n node.Node[K, V]) { + root := v.findBucket(n.Expiration()) + link(root, n) +} + +// Delete removes a timer event for this entry if present. +func (v *Variable[K, V]) Delete(n node.Node[K, V]) { + unlink(n) + n.SetNextExp(nil) + n.SetPrevExp(nil) +} + +func (v *Variable[K, V]) RemoveExpired(expired []node.Node[K, V]) []node.Node[K, V] { + currentTime := unixtime.Now() + prevTime := v.time + v.time = currentTime + + for i := 0; i < len(shift); i++ { + previousTicks := prevTime >> shift[i] + currentTicks := currentTime >> shift[i] + delta := currentTicks - previousTicks + if delta == 0 { + break + } + + expired = v.removeExpiredFromBucket(expired, i, previousTicks, delta) + } + + return expired +} + +func (v *Variable[K, V]) removeExpiredFromBucket(expired []node.Node[K, V], index int, prevTicks, delta uint32) []node.Node[K, V] { + mask := buckets[index] - 1 + steps := buckets[index] + if delta < steps { + steps = delta + } + start := prevTicks & mask + end := start + steps + timerWheel := v.wheel[index] + for i := start; i < end; i++ { + root := timerWheel[i&mask] + n := root.NextExp() + root.SetPrevExp(root) + root.SetNextExp(root) + + for !node.Equals(n, root) { + next := n.NextExp() + n.SetPrevExp(nil) + n.SetNextExp(nil) + + if n.Expiration() <= v.time { + expired = append(expired, n) + } else { + v.Add(n) + } + + n = next + } + } + + return expired +} + +func (v *Variable[K, V]) Clear() { + for i := 0; i < len(v.wheel); i++ { + for j := 0; j < len(v.wheel[i]); j++ { + root := v.wheel[i][j] + n := root.NextExp() + // NOTE(maypok86): Maybe we should use the same approach as in RemoveExpired? + + for !node.Equals(n, root) { + next := n.NextExp() + v.Delete(n) + + n = next + } + } + } + v.time = unixtime.Now() +} + +// link adds the entry at the tail of the bucket's list. +func link[K comparable, V any](root, n node.Node[K, V]) { + n.SetPrevExp(root.PrevExp()) + n.SetNextExp(root) + + root.PrevExp().SetNextExp(n) + root.SetPrevExp(n) +} + +// unlink removes the entry from its bucket, if scheduled. +func unlink[K comparable, V any](n node.Node[K, V]) { + next := n.NextExp() + if !node.Equals(next, nil) { + prev := n.PrevExp() + next.SetPrevExp(prev) + prev.SetNextExp(next) + } +} diff --git a/internal/expire/variable_test.go b/internal/expire/variable_test.go new file mode 100644 index 0000000..39174cf --- /dev/null +++ b/internal/expire/variable_test.go @@ -0,0 +1,130 @@ +package expire + +import ( + "testing" + + "github.com/maypok86/otter/internal/generated/node" + "github.com/maypok86/otter/internal/unixtime" +) + +func contains[K comparable, V any](root, f node.Node[K, V]) bool { + n := root.NextExp() + for !node.Equals(n, root) { + if node.Equals(n, f) { + return true + } + + n = n.NextExp() + } + return false +} + +func match[K comparable, V any](t *testing.T, nodes []node.Node[K, V], keys []K) { + t.Helper() + + if len(nodes) != len(keys) { + t.Fatalf("Not equals lengths of nodes (%d) and keys (%d)", len(nodes), len(keys)) + } + + for i, k := range keys { + if k != nodes[i].Key() { + t.Fatalf("Not valid entry found: %+v", nodes[i]) + } + } +} + +func TestVariable_Add(t *testing.T) { + nm := node.NewManager[string, string](node.Config{ + WithExpiration: true, + }) + nodes := []node.Node[string, string]{ + nm.Create("k1", "", 1, 1), + nm.Create("k2", "", 69, 1), + nm.Create("k3", "", 4399, 1), + } + v := NewVariable[string, string](nm) + + for _, n := range nodes { + v.Add(n) + } + + var found bool + for _, root := range v.wheel[0] { + if contains(root, nodes[0]) { + found = true + } + } + if !found { + t.Fatalf("Not found node %+v in timer wheel", nodes[0]) + } + + found = false + for _, root := range v.wheel[1] { + if contains(root, nodes[1]) { + found = true + } + } + if !found { + t.Fatalf("Not found node %+v in timer wheel", nodes[1]) + } + + found = false + for _, root := range v.wheel[2] { + if contains(root, nodes[2]) { + found = true + } + } + if !found { + t.Fatalf("Not found node %+v in timer wheel", nodes[2]) + } +} + +func TestVariable_Delete(t *testing.T) { +} + +func TestVariable_RemoveExpired(t *testing.T) { + nm := node.NewManager[string, string](node.Config{ + WithExpiration: true, + }) + nodes := []node.Node[string, string]{ + nm.Create("k1", "", 1, 1), + nm.Create("k2", "", 10, 1), + nm.Create("k3", "", 30, 1), + nm.Create("k4", "", 120, 1), + nm.Create("k5", "", 6500, 1), + nm.Create("k6", "", 142000, 1), + nm.Create("k7", "", 1420000, 1), + } + v := NewVariable[string, string](nm) + + for _, n := range nodes { + v.Add(n) + } + + var expired []node.Node[string, string] + var keys []string + unixtime.SetNow(64) + expired = v.RemoveExpired(expired) + keys = append(keys, "k1", "k2", "k3") + match(t, expired, keys) + + unixtime.SetNow(200) + expired = v.RemoveExpired(expired) + keys = append(keys, "k4") + match(t, expired, keys) + + unixtime.SetNow(12000) + expired = v.RemoveExpired(expired) + keys = append(keys, "k5") + match(t, expired, keys) + + unixtime.SetNow(350000) + expired = v.RemoveExpired(expired) + keys = append(keys, "k6") + match(t, expired, keys) + + unixtime.SetNow(1520000) + expired = v.RemoveExpired(expired) + keys = append(keys, "k7") + match(t, expired, keys) +} diff --git a/internal/generated/node/b.go b/internal/generated/node/b.go index 9e4b84f..35a88f5 100644 --- a/internal/generated/node/b.go +++ b/internal/generated/node/b.go @@ -49,6 +49,10 @@ func (n *B[K, V]) Prev() Node[K, V] { } func (n *B[K, V]) SetPrev(v Node[K, V]) { + if v == nil { + n.prev = nil + return + } n.prev = (*B[K, V])(v.AsPointer()) } @@ -57,6 +61,10 @@ func (n *B[K, V]) Next() Node[K, V] { } func (n *B[K, V]) SetNext(v Node[K, V]) { + if v == nil { + n.next = nil + return + } n.next = (*B[K, V])(v.AsPointer()) } diff --git a/internal/generated/node/bc.go b/internal/generated/node/bc.go index 1276749..4c7e723 100644 --- a/internal/generated/node/bc.go +++ b/internal/generated/node/bc.go @@ -53,6 +53,10 @@ func (n *BC[K, V]) Prev() Node[K, V] { } func (n *BC[K, V]) SetPrev(v Node[K, V]) { + if v == nil { + n.prev = nil + return + } n.prev = (*BC[K, V])(v.AsPointer()) } @@ -61,6 +65,10 @@ func (n *BC[K, V]) Next() Node[K, V] { } func (n *BC[K, V]) SetNext(v Node[K, V]) { + if v == nil { + n.next = nil + return + } n.next = (*BC[K, V])(v.AsPointer()) } diff --git a/internal/generated/node/be.go b/internal/generated/node/be.go index 3bdb9f5..2f10b8c 100644 --- a/internal/generated/node/be.go +++ b/internal/generated/node/be.go @@ -56,6 +56,10 @@ func (n *BE[K, V]) Prev() Node[K, V] { } func (n *BE[K, V]) SetPrev(v Node[K, V]) { + if v == nil { + n.prev = nil + return + } n.prev = (*BE[K, V])(v.AsPointer()) } @@ -64,6 +68,10 @@ func (n *BE[K, V]) Next() Node[K, V] { } func (n *BE[K, V]) SetNext(v Node[K, V]) { + if v == nil { + n.next = nil + return + } n.next = (*BE[K, V])(v.AsPointer()) } @@ -72,6 +80,10 @@ func (n *BE[K, V]) PrevExp() Node[K, V] { } func (n *BE[K, V]) SetPrevExp(v Node[K, V]) { + if v == nil { + n.prevExp = nil + return + } n.prevExp = (*BE[K, V])(v.AsPointer()) } @@ -80,6 +92,10 @@ func (n *BE[K, V]) NextExp() Node[K, V] { } func (n *BE[K, V]) SetNextExp(v Node[K, V]) { + if v == nil { + n.nextExp = nil + return + } n.nextExp = (*BE[K, V])(v.AsPointer()) } diff --git a/internal/generated/node/bec.go b/internal/generated/node/bec.go index c3f7061..bb96da3 100644 --- a/internal/generated/node/bec.go +++ b/internal/generated/node/bec.go @@ -60,6 +60,10 @@ func (n *BEC[K, V]) Prev() Node[K, V] { } func (n *BEC[K, V]) SetPrev(v Node[K, V]) { + if v == nil { + n.prev = nil + return + } n.prev = (*BEC[K, V])(v.AsPointer()) } @@ -68,6 +72,10 @@ func (n *BEC[K, V]) Next() Node[K, V] { } func (n *BEC[K, V]) SetNext(v Node[K, V]) { + if v == nil { + n.next = nil + return + } n.next = (*BEC[K, V])(v.AsPointer()) } @@ -76,6 +84,10 @@ func (n *BEC[K, V]) PrevExp() Node[K, V] { } func (n *BEC[K, V]) SetPrevExp(v Node[K, V]) { + if v == nil { + n.prevExp = nil + return + } n.prevExp = (*BEC[K, V])(v.AsPointer()) } @@ -84,6 +96,10 @@ func (n *BEC[K, V]) NextExp() Node[K, V] { } func (n *BEC[K, V]) SetNextExp(v Node[K, V]) { + if v == nil { + n.nextExp = nil + return + } n.nextExp = (*BEC[K, V])(v.AsPointer()) } diff --git a/internal/generated/node/manager.go b/internal/generated/node/manager.go index 6a2b2c4..21f41d2 100644 --- a/internal/generated/node/manager.go +++ b/internal/generated/node/manager.go @@ -66,6 +66,16 @@ type Node[K comparable, V any] interface { Unmark() } +func Equals[K comparable, V any](a, b Node[K, V]) bool { + if a == nil { + return b == nil || b.AsPointer() == nil + } + if b == nil { + return a.AsPointer() == nil + } + return a.AsPointer() == b.AsPointer() +} + type Config struct { WithExpiration bool WithCost bool diff --git a/internal/hashtable/map.go b/internal/hashtable/map.go index 839301f..2ee48a2 100644 --- a/internal/hashtable/map.go +++ b/internal/hashtable/map.go @@ -17,7 +17,7 @@ import ( "github.com/dolthub/maphash" - "github.com/maypok86/otter/internal/node" + "github.com/maypok86/otter/internal/generated/node" "github.com/maypok86/otter/internal/xmath" "github.com/maypok86/otter/internal/xruntime" ) @@ -63,6 +63,7 @@ const ( type Map[K comparable, V any] struct { table unsafe.Pointer + nodeManager *node.Manager[K, V] // only used along with resizeCond resizeMutex sync.Mutex // used to wake up resize waiters (concurrent modifications) @@ -123,17 +124,19 @@ type paddedCounter struct { // NewWithSize creates a new Map instance with capacity enough // to hold size nodes. If size is zero or negative, the value // is ignored. -func NewWithSize[K comparable, V any](size int) *Map[K, V] { - return newMap[K, V](size) +func NewWithSize[K comparable, V any](nodeManager *node.Manager[K, V], size int) *Map[K, V] { + return newMap[K, V](nodeManager, size) } // New creates a new Map instance. -func New[K comparable, V any]() *Map[K, V] { - return newMap[K, V](minNodeCount) +func New[K comparable, V any](nodeManager *node.Manager[K, V]) *Map[K, V] { + return newMap[K, V](nodeManager, minNodeCount) } -func newMap[K comparable, V any](size int) *Map[K, V] { - m := &Map[K, V]{} +func newMap[K comparable, V any](nodeManager *node.Manager[K, V], size int) *Map[K, V] { + m := &Map[K, V]{ + nodeManager: nodeManager, + } m.resizeCond = *sync.NewCond(&m.resizeMutex) var t *table[K] if size <= minNodeCount { @@ -165,10 +168,10 @@ func newTable[K comparable](bucketCount int, prevHasher maphash.Hasher[K]) *tabl return t } -// Get returns the *node.Node stored in the map for a key, or nil if no node is present. +// Get returns the node.Node stored in the map for a key, or nil if no node is present. // // The ok result indicates whether node was found in the map. -func (m *Map[K, V]) Get(key K) (got *node.Node[K, V], ok bool) { +func (m *Map[K, V]) Get(key K) (got node.Node[K, V], ok bool) { t := (*table[K])(atomic.LoadPointer(&m.table)) hash := t.calcShiftHash(key) bucketIdx := hash & t.mask @@ -187,7 +190,7 @@ func (m *Map[K, V]) Get(key K) (got *node.Node[K, V], ok bool) { // concurrent write in this node continue } - n := (*node.Node[K, V])(nodePtr) + n := m.nodeManager.FromPointer(nodePtr) if key != n.Key() { continue } @@ -202,20 +205,20 @@ func (m *Map[K, V]) Get(key K) (got *node.Node[K, V], ok bool) { } } -// Set sets the *node.Node for the key. +// Set sets the node.Node for the key. // // Returns the evicted node or nil if the node was inserted. -func (m *Map[K, V]) Set(n *node.Node[K, V]) *node.Node[K, V] { +func (m *Map[K, V]) Set(n node.Node[K, V]) node.Node[K, V] { return m.set(n, false) } -// SetIfAbsent sets the *node.Node if the specified key is not already associated with a value (or is mapped to null) +// SetIfAbsent sets the node.Node if the specified key is not already associated with a value (or is mapped to null) // associates it with the given value and returns null, else returns the current node. -func (m *Map[K, V]) SetIfAbsent(n *node.Node[K, V]) *node.Node[K, V] { +func (m *Map[K, V]) SetIfAbsent(n node.Node[K, V]) node.Node[K, V] { return m.set(n, true) } -func (m *Map[K, V]) set(n *node.Node[K, V], onlyIfAbsent bool) *node.Node[K, V] { +func (m *Map[K, V]) set(n node.Node[K, V], onlyIfAbsent bool) node.Node[K, V] { for { RETRY: var ( @@ -255,7 +258,7 @@ func (m *Map[K, V]) set(n *node.Node[K, V], onlyIfAbsent bool) *node.Node[K, V] if h != hash { continue } - prev := (*node.Node[K, V])(b.nodes[i]) + prev := m.nodeManager.FromPointer(b.nodes[i]) if n.Key() != prev.Key() { continue } @@ -269,7 +272,7 @@ func (m *Map[K, V]) set(n *node.Node[K, V], onlyIfAbsent bool) *node.Node[K, V] // thus the live value pointers are unique. Otherwise atomic // snapshot won't be correct in case of multiple Store calls // using the same value. - atomic.StorePointer(&b.nodes[i], unsafe.Pointer(n)) + atomic.StorePointer(&b.nodes[i], n.AsPointer()) rootBucket.mutex.Unlock() return prev } @@ -278,7 +281,7 @@ func (m *Map[K, V]) set(n *node.Node[K, V], onlyIfAbsent bool) *node.Node[K, V] // insertion into an existing bucket. // first we update the hash, then the entry. atomic.StoreUint64(&emptyBucket.hashes[emptyIdx], hash) - atomic.StorePointer(&emptyBucket.nodes[emptyIdx], unsafe.Pointer(n)) + atomic.StorePointer(&emptyBucket.nodes[emptyIdx], n.AsPointer()) rootBucket.mutex.Unlock() t.addSize(bucketIdx, 1) return nil @@ -294,7 +297,7 @@ func (m *Map[K, V]) set(n *node.Node[K, V], onlyIfAbsent bool) *node.Node[K, V] // create and append the bucket. newBucket := &paddedBucket{} newBucket.hashes[0] = hash - newBucket.nodes[0] = unsafe.Pointer(n) + newBucket.nodes[0] = n.AsPointer() atomic.StorePointer(&b.next, unsafe.Pointer(newBucket)) rootBucket.mutex.Unlock() t.addSize(bucketIdx, 1) @@ -308,8 +311,8 @@ func (m *Map[K, V]) set(n *node.Node[K, V], onlyIfAbsent bool) *node.Node[K, V] // Delete deletes the value for a key. // // Returns the deleted node or nil if the node wasn't deleted. -func (m *Map[K, V]) Delete(key K) *node.Node[K, V] { - return m.delete(key, func(n *node.Node[K, V]) bool { +func (m *Map[K, V]) Delete(key K) node.Node[K, V] { + return m.delete(key, func(n node.Node[K, V]) bool { return key == n.Key() }) } @@ -317,13 +320,13 @@ func (m *Map[K, V]) Delete(key K) *node.Node[K, V] { // DeleteNode evicts the node for a key. // // Returns the evicted node or nil if the node wasn't evicted. -func (m *Map[K, V]) DeleteNode(n *node.Node[K, V]) *node.Node[K, V] { - return m.delete(n.Key(), func(current *node.Node[K, V]) bool { - return n == current +func (m *Map[K, V]) DeleteNode(n node.Node[K, V]) node.Node[K, V] { + return m.delete(n.Key(), func(current node.Node[K, V]) bool { + return node.Equals(n, current) }) } -func (m *Map[K, V]) delete(key K, cmp func(*node.Node[K, V]) bool) *node.Node[K, V] { +func (m *Map[K, V]) delete(key K, cmp func(node.Node[K, V]) bool) node.Node[K, V] { for { RETRY: hintNonEmpty := 0 @@ -356,7 +359,7 @@ func (m *Map[K, V]) delete(key K, cmp func(*node.Node[K, V]) bool) *node.Node[K, hintNonEmpty++ continue } - current := (*node.Node[K, V])(b.nodes[i]) + current := m.nodeManager.FromPointer(b.nodes[i]) if !cmp(current) { hintNonEmpty++ continue @@ -450,7 +453,7 @@ func (m *Map[K, V]) copyBuckets(b *paddedBucket, dest *table[K]) (copied int) { if b.nodes[i] == nil { continue } - n := (*node.Node[K, V])(b.nodes[i]) + n := m.nodeManager.FromPointer(b.nodes[i]) hash := dest.calcShiftHash(n.Key()) bucketIdx := hash & dest.mask dest.buckets[bucketIdx].add(hash, b.nodes[i]) @@ -493,7 +496,7 @@ func (m *Map[K, V]) waitForResize() { // It is safe to modify the map while iterating it. However, the // concurrent modification rule apply, i.e. the changes may be not // reflected in the subsequently iterated nodes. -func (m *Map[K, V]) Range(f func(*node.Node[K, V]) bool) { +func (m *Map[K, V]) Range(f func(node.Node[K, V]) bool) { var zeroPtr unsafe.Pointer // Pre-allocate array big enough to fit nodes for most hash tables. buffer := make([]unsafe.Pointer, 0, 16*bucketSize) @@ -519,7 +522,7 @@ func (m *Map[K, V]) Range(f func(*node.Node[K, V]) bool) { } // Call the function for all copied nodes. for j := range buffer { - n := (*node.Node[K, V])(buffer[j]) + n := m.nodeManager.FromPointer(buffer[j]) if !f(n) { return } diff --git a/internal/hashtable/map_test.go b/internal/hashtable/map_test.go index beefefd..ac1beb4 100644 --- a/internal/hashtable/map_test.go +++ b/internal/hashtable/map_test.go @@ -30,24 +30,21 @@ import ( "time" "unsafe" - "github.com/maypok86/otter/internal/node" + "github.com/maypok86/otter/internal/generated/node" "github.com/maypok86/otter/internal/xruntime" ) -func newNode[K comparable, V any](key K, value V) *node.Node[K, V] { - return node.New[K, V](key, value, 0, 1) -} - func TestMap_PaddedBucketSize(t *testing.T) { size := unsafe.Sizeof(paddedBucket{}) if size != xruntime.CacheLineSize { - t.Fatalf("size of 128B (two cache lines) is expected, got: %d", size) + t.Fatalf("size of 64B (one cache line) is expected, got: %d", size) } } func TestMap_EmptyStringKey(t *testing.T) { - m := New[string, string]() - m.Set(newNode[string, string]("", "foobar")) + nm := node.NewManager[string, string](node.Config{}) + m := New(nm) + m.Set(nm.Create("", "foobar", 0, 1)) n, ok := m.Get("") if !ok { t.Fatal("value was expected") @@ -58,8 +55,9 @@ func TestMap_EmptyStringKey(t *testing.T) { } func TestMap_SetNilValue(t *testing.T) { - m := New[string, *struct{}]() - m.Set(newNode[string, *struct{}]("foo", nil)) + nm := node.NewManager[string, *struct{}](node.Config{}) + m := New(nm) + m.Set(nm.Create("foo", nil, 0, 1)) n, ok := m.Get("foo") if !ok { t.Fatal("nil value was expected") @@ -71,9 +69,10 @@ func TestMap_SetNilValue(t *testing.T) { func TestMap_Set(t *testing.T) { const numberOfNodes = 128 - m := New[string, int]() + nm := node.NewManager[string, int](node.Config{}) + m := New(nm) for i := 0; i < numberOfNodes; i++ { - m.Set(newNode[string, int](strconv.Itoa(i), i)) + m.Set(nm.Create(strconv.Itoa(i), i, 0, 1)) } for i := 0; i < numberOfNodes; i++ { n, ok := m.Get(strconv.Itoa(i)) @@ -88,15 +87,16 @@ func TestMap_Set(t *testing.T) { func TestMap_SetIfAbsent(t *testing.T) { const numberOfNodes = 128 - m := New[string, int]() + nm := node.NewManager[string, int](node.Config{}) + m := New(nm) for i := 0; i < numberOfNodes; i++ { - res := m.SetIfAbsent(newNode[string, int](strconv.Itoa(i), i)) + res := m.SetIfAbsent(nm.Create(strconv.Itoa(i), i, 0, 1)) if res != nil { t.Fatalf("set was dropped. got: %+v", res) } } for i := 0; i < numberOfNodes; i++ { - n := newNode[string, int](strconv.Itoa(i), i) + n := nm.Create(strconv.Itoa(i), i, 0, 1) res := m.SetIfAbsent(n) if res == nil { t.Fatalf("set was not dropped. node that was set: %+v", res) @@ -122,7 +122,8 @@ type hasher struct { func TestMap_SetWithCollisions(t *testing.T) { const numNodes = 1000 - m := NewWithSize[int, int](numNodes) + nm := node.NewManager[int, int](node.Config{}) + m := NewWithSize(nm, numNodes) table := (*table[int])(atomic.LoadPointer(&m.table)) hasher := (*hasher)((unsafe.Pointer)(&table.hasher)) hasher.hash = func(ptr unsafe.Pointer, seed uintptr) uintptr { @@ -131,7 +132,7 @@ func TestMap_SetWithCollisions(t *testing.T) { return 42 } for i := 0; i < numNodes; i++ { - m.Set(newNode(i, i)) + m.Set(nm.Create(i, i, 0, 1)) } for i := 0; i < numNodes; i++ { v, ok := m.Get(i) @@ -146,9 +147,10 @@ func TestMap_SetWithCollisions(t *testing.T) { func TestMap_SetThenDelete(t *testing.T) { const numberOfNodes = 1000 - m := New[string, int]() + nm := node.NewManager[string, int](node.Config{}) + m := New(nm) for i := 0; i < numberOfNodes; i++ { - m.Set(newNode[string, int](strconv.Itoa(i), i)) + m.Set(nm.Create(strconv.Itoa(i), i, 0, 1)) } for i := 0; i < numberOfNodes; i++ { m.Delete(strconv.Itoa(i)) @@ -160,13 +162,14 @@ func TestMap_SetThenDelete(t *testing.T) { func TestMap_Range(t *testing.T) { const numNodes = 1000 - m := New[string, int]() + nm := node.NewManager[string, int](node.Config{}) + m := New(nm) for i := 0; i < numNodes; i++ { - m.Set(newNode(strconv.Itoa(i), i)) + m.Set(nm.Create(strconv.Itoa(i), i, 0, 1)) } iters := 0 met := make(map[string]int) - m.Range(func(n *node.Node[string, int]) bool { + m.Range(func(n node.Node[string, int]) bool { if n.Key() != strconv.Itoa(n.Value()) { t.Fatalf("got unexpected key/value for iteration %d: %v/%v", iters, n.Key(), n.Value()) return false @@ -186,12 +189,13 @@ func TestMap_Range(t *testing.T) { } func TestMap_RangeFalseReturned(t *testing.T) { - m := New[string, int]() + nm := node.NewManager[string, int](node.Config{}) + m := New(nm) for i := 0; i < 100; i++ { - m.Set(newNode(strconv.Itoa(i), i)) + m.Set(nm.Create(strconv.Itoa(i), i, 0, 1)) } iters := 0 - m.Range(func(n *node.Node[string, int]) bool { + m.Range(func(n node.Node[string, int]) bool { iters++ return iters != 13 }) @@ -202,11 +206,12 @@ func TestMap_RangeFalseReturned(t *testing.T) { func TestMap_RangeNestedDelete(t *testing.T) { const numNodes = 256 - m := New[string, int]() + nm := node.NewManager[string, int](node.Config{}) + m := New(nm) for i := 0; i < numNodes; i++ { - m.Set(newNode(strconv.Itoa(i), i)) + m.Set(nm.Create(strconv.Itoa(i), i, 0, 1)) } - m.Range(func(n *node.Node[string, int]) bool { + m.Range(func(n node.Node[string, int]) bool { m.Delete(n.Key()) return true }) @@ -219,14 +224,15 @@ func TestMap_RangeNestedDelete(t *testing.T) { func TestMap_Size(t *testing.T) { const numberOfNodes = 1000 - m := New[string, int]() + nm := node.NewManager[string, int](node.Config{}) + m := New(nm) size := m.Size() if size != 0 { t.Fatalf("zero size expected: %d", size) } expectedSize := 0 for i := 0; i < numberOfNodes; i++ { - m.Set(newNode[string, int](strconv.Itoa(i), i)) + m.Set(nm.Create(strconv.Itoa(i), i, 0, 1)) expectedSize++ size := m.Size() if size != expectedSize { @@ -245,9 +251,10 @@ func TestMap_Size(t *testing.T) { func TestMap_Clear(t *testing.T) { const numberOfNodes = 1000 - m := New[string, int]() + nm := node.NewManager[string, int](node.Config{}) + m := New(nm) for i := 0; i < numberOfNodes; i++ { - m.Set(newNode[string, int](strconv.Itoa(i), i)) + m.Set(nm.Create(strconv.Itoa(i), i, 0, 1)) } size := m.Size() if size != numberOfNodes { @@ -266,7 +273,7 @@ func parallelSeqSetter(t *testing.T, m *Map[string, int], storers, iterations, n for i := 0; i < iterations; i++ { for j := 0; j < nodes; j++ { if storers == 0 || j%storers == 0 { - m.Set(newNode[string, int](strconv.Itoa(j), j)) + m.Set(m.nodeManager.Create(strconv.Itoa(j), j, 0, 1)) n, ok := m.Get(strconv.Itoa(j)) if !ok { t.Errorf("value was not found for %d", j) @@ -286,7 +293,8 @@ func TestMap_ParallelSets(t *testing.T) { const storers = 4 const iterations = 10_000 const nodes = 100 - m := New[string, int]() + nm := node.NewManager[string, int](node.Config{}) + m := New(nm) wg := &sync.WaitGroup{} wg.Add(storers) @@ -312,7 +320,7 @@ func parallelRandSetter(t *testing.T, m *Map[string, int], iteratinos, nodes int r := rand.New(rand.NewSource(time.Now().UnixNano())) for i := 0; i < iteratinos; i++ { j := r.Intn(nodes) - m.Set(newNode[string, int](strconv.Itoa(j), j)) + m.Set(m.nodeManager.Create(strconv.Itoa(j), j, 0, 1)) } wg.Done() } @@ -346,7 +354,8 @@ func parallelGetter(t *testing.T, m *Map[string, int], iterations, nodes int, wg func TestMap_ParallelGet(t *testing.T) { const iterations = 100_000 const nodes = 100 - m := New[string, int]() + nm := node.NewManager[string, int](node.Config{}) + m := New(nm) wg := &sync.WaitGroup{} wg.Add(3) @@ -361,7 +370,8 @@ func TestMap_ParallelSetsAndDeletes(t *testing.T) { const workers = 2 const iterations = 100_000 const nodes = 1000 - m := New[string, int]() + nm := node.NewManager[string, int](node.Config{}) + m := New(nm) wg := &sync.WaitGroup{} wg.Add(2 * workers) for i := 0; i < workers; i++ { @@ -377,7 +387,7 @@ func parallelTypedRangeSetter(t *testing.T, m *Map[int, int], numNodes int, stop for { for i := 0; i < numNodes; i++ { - m.Set(newNode(i, i)) + m.Set(m.nodeManager.Create(i, i, 0, 1)) } if atomic.LoadInt64(stopFlag) != 0 { break @@ -402,9 +412,10 @@ func parallelTypedRangeDeleter(t *testing.T, m *Map[int, int], numNodes int, sto func TestMap_ParallelRange(t *testing.T) { const numNodes = 10_000 - m := New[int, int]() + nm := node.NewManager[int, int](node.Config{}) + m := New(nm) for i := 0; i < numNodes; i++ { - m.Set(newNode(i, i)) + m.Set(nm.Create(i, i, 0, 1)) } // Start goroutines that would be storing and deleting items in parallel. cdone := make(chan bool) @@ -413,7 +424,7 @@ func TestMap_ParallelRange(t *testing.T) { go parallelTypedRangeDeleter(t, m, numNodes, &stopFlag, cdone) // Iterate the map and verify that no duplicate keys were met. met := make(map[int]int) - m.Range(func(n *node.Node[int, int]) bool { + m.Range(func(n node.Node[int, int]) bool { if n.Key() != n.Value() { t.Fatalf("got unexpected value for key %d: %d", n.Key(), n.Value()) return false diff --git a/internal/lossy/buffer.go b/internal/lossy/buffer.go index 0553f0d..6bb35a6 100644 --- a/internal/lossy/buffer.go +++ b/internal/lossy/buffer.go @@ -19,6 +19,7 @@ import ( "sync/atomic" "unsafe" + "github.com/maypok86/otter/internal/generated/node" "github.com/maypok86/otter/internal/xruntime" ) @@ -29,8 +30,8 @@ const ( ) // PolicyBuffers is the set of buffers returned by the lossy buffer. -type PolicyBuffers[T any] struct { - Returned []*T +type PolicyBuffers[K comparable, V any] struct { + Returned []node.Node[K, V] } // Buffer is a circular ring buffer stores the elements being transferred by the producers to the consumer. @@ -46,24 +47,26 @@ type PolicyBuffers[T any] struct { // and the next read count are lazily set. // // This implementation is striped to further increase concurrency. -type Buffer[T any] struct { +type Buffer[K comparable, V any] struct { head atomic.Uint64 headPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte tail atomic.Uint64 tailPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte + nodeManager *node.Manager[K, V] returned unsafe.Pointer - returnedPadding [xruntime.CacheLineSize - 8]byte + returnedPadding [xruntime.CacheLineSize - 2*8]byte policyBuffers unsafe.Pointer returnedSlicePadding [xruntime.CacheLineSize - 8]byte buffer [capacity]unsafe.Pointer } // New creates a new lossy Buffer. -func New[T any]() *Buffer[T] { - pb := &PolicyBuffers[T]{ - Returned: make([]*T, 0, capacity), +func New[K comparable, V any](nodeManager *node.Manager[K, V]) *Buffer[K, V] { + pb := &PolicyBuffers[K, V]{ + Returned: make([]node.Node[K, V], 0, capacity), } - b := &Buffer[T]{ + b := &Buffer[K, V]{ + nodeManager: nodeManager, policyBuffers: unsafe.Pointer(pb), } b.returned = b.policyBuffers @@ -73,7 +76,7 @@ func New[T any]() *Buffer[T] { // Add lazily publishes the item to the consumer. // // item may be lost due to contention. -func (b *Buffer[T]) Add(item *T) *PolicyBuffers[T] { +func (b *Buffer[K, V]) Add(n node.Node[K, V]) *PolicyBuffers[K, V] { head := b.head.Load() tail := b.tail.Load() size := tail - head @@ -84,7 +87,7 @@ func (b *Buffer[T]) Add(item *T) *PolicyBuffers[T] { if b.tail.CompareAndSwap(tail, tail+1) { // success index := int(tail & mask) - atomic.StorePointer(&b.buffer[index], unsafe.Pointer(item)) + atomic.StorePointer(&b.buffer[index], n.AsPointer()) if size == capacity-1 { // try return new buffer if !atomic.CompareAndSwapPointer(&b.returned, b.policyBuffers, nil) { @@ -92,13 +95,13 @@ func (b *Buffer[T]) Add(item *T) *PolicyBuffers[T] { return nil } - pb := (*PolicyBuffers[T])(b.policyBuffers) + pb := (*PolicyBuffers[K, V])(b.policyBuffers) for i := 0; i < capacity; i++ { index := int(head & mask) - v := (*T)(atomic.LoadPointer(&b.buffer[index])) + v := atomic.LoadPointer(&b.buffer[index]) if v != nil { // published - pb.Returned = append(pb.Returned, v) + pb.Returned = append(pb.Returned, b.nodeManager.FromPointer(v)) // release atomic.StorePointer(&b.buffer[index], nil) } @@ -115,8 +118,8 @@ func (b *Buffer[T]) Add(item *T) *PolicyBuffers[T] { } // Free returns the processed buffer back and also clears it. -func (b *Buffer[T]) Free() { - pb := (*PolicyBuffers[T])(b.policyBuffers) +func (b *Buffer[K, V]) Free() { + pb := (*PolicyBuffers[K, V])(b.policyBuffers) for i := 0; i < len(pb.Returned); i++ { pb.Returned[i] = nil } @@ -125,7 +128,7 @@ func (b *Buffer[T]) Free() { } // Clear clears the lossy Buffer and returns it to the default state. -func (b *Buffer[T]) Clear() { +func (b *Buffer[K, V]) Clear() { for !atomic.CompareAndSwapPointer(&b.returned, b.policyBuffers, nil) { runtime.Gosched() } diff --git a/internal/node/node.go b/internal/node/node.go deleted file mode 100644 index 7d250bf..0000000 --- a/internal/node/node.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright (c) 2023 Alexey Mayshev. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package node - -import ( - "github.com/maypok86/otter/internal/unixtime" -) - -const ( - unknownQueueType uint8 = iota - smallQueueType - mainQueueType - - maxFrequency uint8 = 3 -) - -// Node is an entry in the cache containing the key, value, cost, access and write metadata. -type Node[K comparable, V any] struct { - key K - value V - prev *Node[K, V] - next *Node[K, V] - expiration uint32 - cost uint32 - frequency uint8 - queueType uint8 -} - -// New creates a new Node. -func New[K comparable, V any](key K, value V, expiration, cost uint32) *Node[K, V] { - return &Node[K, V]{ - key: key, - value: value, - expiration: expiration, - cost: cost, - } -} - -// Key returns the key. -func (n *Node[K, V]) Key() K { - return n.key -} - -// Value returns the value. -func (n *Node[K, V]) Value() V { - return n.value -} - -// IsExpired returns true if node is expired. -func (n *Node[K, V]) IsExpired() bool { - return n.expiration > 0 && n.expiration < unixtime.Now() -} - -// Expiration returns the expiration time. -func (n *Node[K, V]) Expiration() uint32 { - return n.expiration -} - -// Cost returns the cost of the node. -func (n *Node[K, V]) Cost() uint32 { - return n.cost -} - -// Frequency returns the frequency of the node. -func (n *Node[K, V]) Frequency() uint8 { - return n.frequency -} - -// IncrementFrequency increments the frequency of the node. -func (n *Node[K, V]) IncrementFrequency() { - n.frequency = minUint8(n.frequency+1, maxFrequency) -} - -// DecrementFrequency decrements the frequency of the node. -func (n *Node[K, V]) DecrementFrequency() { - n.frequency-- -} - -// ResetFrequency resets the frequency. -func (n *Node[K, V]) ResetFrequency() { - n.frequency = 0 -} - -// MarkSmall sets the status to the small queue. -func (n *Node[K, V]) MarkSmall() { - n.queueType = smallQueueType -} - -// IsSmall returns true if node is in the small queue. -func (n *Node[K, V]) IsSmall() bool { - return n.queueType == smallQueueType -} - -// MarkMain sets the status to the main queue. -func (n *Node[K, V]) MarkMain() { - n.queueType = mainQueueType -} - -// IsMain returns true if node is in the main queue. -func (n *Node[K, V]) IsMain() bool { - return n.queueType == mainQueueType -} - -// Unmark sets the status to unknown. -func (n *Node[K, V]) Unmark() { - n.queueType = unknownQueueType -} - -func minUint8(a, b uint8) uint8 { - if a < b { - return a - } - - return b -} diff --git a/internal/node/node_test.go b/internal/node/node_test.go deleted file mode 100644 index cb87369..0000000 --- a/internal/node/node_test.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright (c) 2023 Alexey Mayshev. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package node - -import "testing" - -func TestNode(t *testing.T) { - key := 1 - value := 2 - expiration := uint32(6) - cost := uint32(4) - n := New[int, int](key, value, expiration, cost) - - // key - if n.Key() != key { - t.Fatalf("n.Key() = %d, want %d", n.Key(), key) - } - - // value - if n.Value() != value { - t.Fatalf("n.Value() = %d, want %d", n.Value(), value) - } - - // expiration - if n.IsExpired() { - t.Fatalf("node shouldn't be expired") - } - - if n.Expiration() != expiration { - t.Fatalf("n.Exiration() = %d, want %d", n.Expiration(), expiration) - } - - // cost - if n.Cost() != cost { - t.Fatalf("n.Cost() = %d, want %d", n.Cost(), cost) - } - - // frequency - for i := uint8(0); i < 10; i++ { - if i < 4 { - if n.Frequency() != i { - t.Fatalf("n.Frequency() = %d, want %d", n.Frequency(), i) - } - } else { - if n.Frequency() != 3 { - t.Fatalf("n.Frequency() = %d, want %d", n.Frequency(), 3) - } - } - n.IncrementFrequency() - } - - n.DecrementFrequency() - n.DecrementFrequency() - if n.Frequency() != 1 { - t.Fatalf("n.Frequency() = %d, want %d", n.Frequency(), 1) - } - - n.IncrementFrequency() - n.ResetFrequency() - if n.Frequency() != 0 { - t.Fatalf("n.Frequency() = %d, want %d", n.Frequency(), 0) - } - - // queueType - if n.IsSmall() || n.IsMain() { - t.Fatalf("queueType should be unknown") - } - - n.MarkSmall() - if !n.IsSmall() || n.IsMain() { - t.Fatalf("queueType should be smallQueue") - } - n.MarkSmall() - if !n.IsSmall() || n.IsMain() { - t.Fatalf("queueType should be smallQueue") - } - - n.MarkMain() - if n.IsSmall() || !n.IsMain() { - t.Fatalf("queueType should be mainQueue") - } - n.MarkMain() - if n.IsSmall() || !n.IsMain() { - t.Fatalf("queueType should be mainQueue") - } - - n.Unmark() - if n.IsSmall() || n.IsMain() { - t.Fatalf("queueType should be unknown") - } -} diff --git a/internal/node/queue.go b/internal/node/queue.go deleted file mode 100644 index 73f88ff..0000000 --- a/internal/node/queue.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright (c) 2023 Alexey Mayshev. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package node - -type Queue[K comparable, V any] struct { - head *Node[K, V] - tail *Node[K, V] - len int -} - -func NewQueue[K comparable, V any]() *Queue[K, V] { - return &Queue[K, V]{} -} - -func (q *Queue[K, V]) Len() int { - return q.len -} - -func (q *Queue[K, V]) IsEmpty() bool { - return q.Len() == 0 -} - -func (q *Queue[K, V]) Push(n *Node[K, V]) { - if q.IsEmpty() { - q.head = n - q.tail = n - } else { - n.prev = q.tail - q.tail.next = n - q.tail = n - } - - q.len++ -} - -func (q *Queue[K, V]) Pop() *Node[K, V] { - if q.IsEmpty() { - return nil - } - - result := q.head - q.Remove(result) - return result -} - -func (q *Queue[K, V]) Remove(n *Node[K, V]) { - next := n.next - prev := n.prev - - if prev == nil { - if next == nil && q.head != n { - return - } - - q.head = next - } else { - prev.next = next - n.prev = nil - } - - if next == nil { - q.tail = prev - } else { - next.prev = prev - n.next = nil - } - - q.len-- -} - -func (q *Queue[K, V]) Clear() { - for !q.IsEmpty() { - q.Pop() - } -} diff --git a/internal/node/queue_test.go b/internal/node/queue_test.go deleted file mode 100644 index eb2c105..0000000 --- a/internal/node/queue_test.go +++ /dev/null @@ -1,147 +0,0 @@ -// Copyright (c) 2023 Alexey Mayshev. All rights reserved. -// Copyright 2009 The Go Authors. All rights reserved. -// -// Copyright notice. Initial version of the following tests was based on -// the following file from the Go Programming Language core repo: -// https://cs.opensource.google/go/go/+/refs/tags/go1.21.5:src/container/list/list_test.go -// -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. -// That can be found at https://cs.opensource.google/go/go/+/refs/tags/go1.21.5:LICENSE - -package node - -import ( - "strconv" - "testing" -) - -func checkQueueLen[K comparable, V any](t *testing.T, q *Queue[K, V], length int) bool { - t.Helper() - - if n := q.Len(); n != length { - t.Errorf("q.Len() = %d, want %d", n, length) - return false - } - return true -} - -func checkQueuePointers[K comparable, V any](t *testing.T, q *Queue[K, V], nodes []*Node[K, V]) { - t.Helper() - - if !checkQueueLen(t, q, len(nodes)) { - return - } - - // zero length queues must be the zero value - if len(nodes) == 0 { - if !(q.head == nil && q.tail == nil) { - t.Errorf("q.head = %p, q.tail = %p; both should be nil", q.head, q.tail) - } - return - } - - // check internal and external prev/next connections - for i, n := range nodes { - prev := (*Node[K, V])(nil) - if i > 0 { - prev = nodes[i-1] - } - if p := n.prev; p != prev { - t.Errorf("elt[%d](%p).prev = %p, want %p", i, n, p, prev) - } - - next := (*Node[K, V])(nil) - if i < len(nodes)-1 { - next = nodes[i+1] - } - if nn := n.next; nn != next { - t.Errorf("nodes[%d](%p).next = %p, want %p", i, n, nn, next) - } - } -} - -func newFakeNode[K comparable](e K) *Node[K, K] { - return New(e, e, 0, 0) -} - -func TestQueue(t *testing.T) { - q := NewQueue[string, string]() - checkQueuePointers(t, q, []*Node[string, string]{}) - - // Single element queue - e := newFakeNode("a") - q.Push(e) - checkQueuePointers(t, q, []*Node[string, string]{e}) - q.Remove(e) - q.Push(e) - checkQueuePointers(t, q, []*Node[string, string]{e}) - q.Remove(e) - checkQueuePointers(t, q, []*Node[string, string]{}) - - // Bigger queue - e2 := newFakeNode("2") - e1 := newFakeNode("1") - e3 := newFakeNode("3") - e4 := newFakeNode("4") - q.Push(e1) - q.Push(e2) - q.Push(e3) - q.Push(e4) - checkQueuePointers(t, q, []*Node[string, string]{e1, e2, e3, e4}) - - q.Remove(e2) - checkQueuePointers(t, q, []*Node[string, string]{e1, e3, e4}) - - // move from middle - q.Remove(e3) - q.Push(e3) - checkQueuePointers(t, q, []*Node[string, string]{e1, e4, e3}) - - q.Clear() - q.Push(e3) - q.Push(e1) - q.Push(e4) - checkQueuePointers(t, q, []*Node[string, string]{e3, e1, e4}) - - // should be no-op - q.Remove(e3) - q.Push(e3) - checkQueuePointers(t, q, []*Node[string, string]{e1, e4, e3}) - - // Check standard iteration. - sum := 0 - for e := q.head; e != nil; e = e.next { - i, err := strconv.Atoi(e.value) - if err != nil { - continue - } - sum += i - } - if sum != 8 { - t.Errorf("sum over l = %d, want 8", sum) - } - - // Clear all elements by iterating - var next *Node[string, string] - for e := q.head; e != nil; e = next { - next = e.next - q.Remove(e) - } - checkQueuePointers(t, q, []*Node[string, string]{}) -} - -func TestQueue_Remove(t *testing.T) { - q := NewQueue[int, int]() - - e1 := newFakeNode(1) - e2 := newFakeNode(2) - q.Push(e1) - q.Push(e2) - checkQueuePointers(t, q, []*Node[int, int]{e1, e2}) - e := q.head - q.Remove(e) - checkQueuePointers(t, q, []*Node[int, int]{e2}) - q.Remove(e) - checkQueuePointers(t, q, []*Node[int, int]{e2}) -} diff --git a/internal/s3fifo/ghost.go b/internal/s3fifo/ghost.go index 9c8be80..316a928 100644 --- a/internal/s3fifo/ghost.go +++ b/internal/s3fifo/ghost.go @@ -19,7 +19,7 @@ import ( "github.com/dolthub/swiss" "github.com/gammazero/deque" - "github.com/maypok86/otter/internal/node" + "github.com/maypok86/otter/internal/generated/node" ) type ghost[K comparable, V any] struct { @@ -39,12 +39,12 @@ func newGhost[K comparable, V any](main *main[K, V]) *ghost[K, V] { } } -func (g *ghost[K, V]) isGhost(n *node.Node[K, V]) bool { +func (g *ghost[K, V]) isGhost(n node.Node[K, V]) bool { _, ok := g.m.Get(g.hasher.Hash(n.Key())) return ok } -func (g *ghost[K, V]) insert(deleted []*node.Node[K, V], n *node.Node[K, V]) []*node.Node[K, V] { +func (g *ghost[K, V]) insert(deleted []node.Node[K, V], n node.Node[K, V]) []node.Node[K, V] { deleted = append(deleted, n) h := g.hasher.Hash(n.Key()) diff --git a/internal/s3fifo/main.go b/internal/s3fifo/main.go index e4eeb61..66cf8b8 100644 --- a/internal/s3fifo/main.go +++ b/internal/s3fifo/main.go @@ -15,34 +15,34 @@ package s3fifo import ( - "github.com/maypok86/otter/internal/node" + "github.com/maypok86/otter/internal/generated/node" ) const maxReinsertions = 20 type main[K comparable, V any] struct { - q *node.Queue[K, V] + q *queue[K, V] cost uint32 maxCost uint32 } func newMain[K comparable, V any](maxCost uint32) *main[K, V] { return &main[K, V]{ - q: node.NewQueue[K, V](), + q: newQueue[K, V](), maxCost: maxCost, } } -func (m *main[K, V]) insert(n *node.Node[K, V]) { - m.q.Push(n) +func (m *main[K, V]) insert(n node.Node[K, V]) { + m.q.push(n) n.MarkMain() m.cost += n.Cost() } -func (m *main[K, V]) evict(deleted []*node.Node[K, V]) []*node.Node[K, V] { +func (m *main[K, V]) evict(deleted []node.Node[K, V]) []node.Node[K, V] { reinsertions := 0 for m.cost > 0 { - n := m.q.Pop() + n := m.q.pop() if n.IsExpired() || n.Frequency() == 0 { n.Unmark() @@ -58,24 +58,24 @@ func (m *main[K, V]) evict(deleted []*node.Node[K, V]) []*node.Node[K, V] { return append(deleted, n) } - m.q.Push(n) + m.q.push(n) n.DecrementFrequency() } return deleted } -func (m *main[K, V]) remove(n *node.Node[K, V]) { +func (m *main[K, V]) remove(n node.Node[K, V]) { m.cost -= n.Cost() n.Unmark() - m.q.Remove(n) + m.q.remove(n) } func (m *main[K, V]) length() int { - return m.q.Len() + return m.q.length() } func (m *main[K, V]) clear() { - m.q.Clear() + m.q.clear() m.cost = 0 } diff --git a/internal/s3fifo/policy.go b/internal/s3fifo/policy.go index 3359d69..4888c65 100644 --- a/internal/s3fifo/policy.go +++ b/internal/s3fifo/policy.go @@ -15,7 +15,8 @@ package s3fifo import ( - "github.com/maypok86/otter/internal/node" + "github.com/maypok86/otter/internal/generated/node" + "github.com/maypok86/otter/internal/task" ) // Policy is an eviction policy based on S3-FIFO eviction algorithm @@ -48,13 +49,13 @@ func NewPolicy[K comparable, V any](maxCost uint32) *Policy[K, V] { } // Read updates the eviction policy based on node accesses. -func (p *Policy[K, V]) Read(nodes []*node.Node[K, V]) { +func (p *Policy[K, V]) Read(nodes []node.Node[K, V]) { for _, n := range nodes { n.IncrementFrequency() } } -func (p *Policy[K, V]) insert(deleted []*node.Node[K, V], n *node.Node[K, V]) []*node.Node[K, V] { +func (p *Policy[K, V]) insert(deleted []node.Node[K, V], n node.Node[K, V]) []node.Node[K, V] { if p.ghost.isGhost(n) { p.main.insert(n) n.ResetFrequency() @@ -69,7 +70,7 @@ func (p *Policy[K, V]) insert(deleted []*node.Node[K, V], n *node.Node[K, V]) [] return deleted } -func (p *Policy[K, V]) evict(deleted []*node.Node[K, V]) []*node.Node[K, V] { +func (p *Policy[K, V]) evict(deleted []node.Node[K, V]) []node.Node[K, V] { if p.small.cost >= p.maxCost/10 { return p.small.evict(deleted) } @@ -83,21 +84,21 @@ func (p *Policy[K, V]) isFull() bool { // Write updates the eviction policy based on node updates. func (p *Policy[K, V]) Write( - deleted []*node.Node[K, V], - tasks []node.WriteTask[K, V], -) []*node.Node[K, V] { - for _, task := range tasks { - n := task.Node() + deleted []node.Node[K, V], + tasks []task.WriteTask[K, V], +) []node.Node[K, V] { + for _, t := range tasks { + n := t.Node() // already deleted in map - if task.IsDelete() { - p.delete(task.Node()) + if t.IsDelete() { + p.delete(t.Node()) continue } - if task.IsUpdate() { + if t.IsUpdate() { // delete old node - p.delete(task.OldNode()) + p.delete(t.OldNode()) // insert new node } @@ -108,13 +109,13 @@ func (p *Policy[K, V]) Write( } // Delete deletes nodes from the eviction policy. -func (p *Policy[K, V]) Delete(buffer []*node.Node[K, V]) { +func (p *Policy[K, V]) Delete(buffer []node.Node[K, V]) { for _, n := range buffer { p.delete(n) } } -func (p *Policy[K, V]) delete(n *node.Node[K, V]) { +func (p *Policy[K, V]) delete(n node.Node[K, V]) { if n.IsSmall() { p.small.remove(n) return diff --git a/internal/s3fifo/policy_test.go b/internal/s3fifo/policy_test.go index 804463f..334cf84 100644 --- a/internal/s3fifo/policy_test.go +++ b/internal/s3fifo/policy_test.go @@ -17,26 +17,28 @@ package s3fifo import ( "testing" - "github.com/maypok86/otter/internal/node" + "github.com/maypok86/otter/internal/generated/node" + "github.com/maypok86/otter/internal/task" ) -func newNode(k int) *node.Node[int, int] { - n := node.New[int, int](k, k, 0, 1) +func newNode(k int) node.Node[int, int] { + m := node.NewManager[int, int](node.Config{}) + n := m.Create(k, k, 0, 1) return n } -func nodesToAddTasks(nodes []*node.Node[int, int]) []node.WriteTask[int, int] { - tasks := make([]node.WriteTask[int, int], 0, len(nodes)) +func nodesToAddTasks(nodes []node.Node[int, int]) []task.WriteTask[int, int] { + tasks := make([]task.WriteTask[int, int], 0, len(nodes)) for _, n := range nodes { - tasks = append(tasks, node.NewAddTask(n)) + tasks = append(tasks, task.NewAddTask(n)) } return tasks } -func nodesToDeleteTasks(nodes []*node.Node[int, int]) []node.WriteTask[int, int] { - tasks := make([]node.WriteTask[int, int], 0, len(nodes)) +func nodesToDeleteTasks(nodes []node.Node[int, int]) []task.WriteTask[int, int] { + tasks := make([]task.WriteTask[int, int], 0, len(nodes)) for _, n := range nodes { - tasks = append(tasks, node.NewDeleteTask(n)) + tasks = append(tasks, task.NewDeleteTask(n)) } return tasks } @@ -44,7 +46,7 @@ func nodesToDeleteTasks(nodes []*node.Node[int, int]) []node.WriteTask[int, int] func TestPolicy_ReadAndWrite(t *testing.T) { n := newNode(2) p := NewPolicy[int, int](10) - p.Write(nil, []node.WriteTask[int, int]{node.NewAddTask(n)}) + p.Write(nil, []task.WriteTask[int, int]{task.NewAddTask(n)}) if !n.IsSmall() { t.Fatalf("not valid node state: %+v", n) } @@ -53,12 +55,12 @@ func TestPolicy_ReadAndWrite(t *testing.T) { func TestPolicy_OneHitWonders(t *testing.T) { p := NewPolicy[int, int](10) - oneHitWonders := make([]*node.Node[int, int], 0, 2) + oneHitWonders := make([]node.Node[int, int], 0, 2) for i := 0; i < cap(oneHitWonders); i++ { oneHitWonders = append(oneHitWonders, newNode(i+1)) } - popular := make([]*node.Node[int, int], 0, 8) + popular := make([]node.Node[int, int], 0, 8) for i := 0; i < cap(popular); i++ { popular = append(popular, newNode(i+3)) } @@ -71,7 +73,7 @@ func TestPolicy_OneHitWonders(t *testing.T) { p.Read(popular) } - newNodes := make([]*node.Node[int, int], 0, 11) + newNodes := make([]node.Node[int, int], 0, 11) for i := 0; i < cap(newNodes); i++ { newNodes = append(newNodes, newNode(i+12)) } @@ -102,18 +104,19 @@ func TestPolicy_Update(t *testing.T) { p := NewPolicy[int, int](100) n := newNode(1) - n1 := node.New[int, int](1, 1, 0, n.Cost()+8) + m := node.NewManager[int, int](node.Config{WithCost: true}) + n1 := m.Create(1, 1, 0, n.Cost()+8) - p.Write(nil, []node.WriteTask[int, int]{ - node.NewAddTask(n), - node.NewUpdateTask(n1, n), + p.Write(nil, []task.WriteTask[int, int]{ + task.NewAddTask(n), + task.NewUpdateTask(n1, n), }) - p.Read([]*node.Node[int, int]{n1, n1}) + p.Read([]node.Node[int, int]{n1, n1}) - n2 := node.New[int, int](2, 1, 0, 92) - deleted := p.Write(nil, []node.WriteTask[int, int]{ - node.NewAddTask(n2), + n2 := m.Create(2, 1, 0, 92) + deleted := p.Write(nil, []task.WriteTask[int, int]{ + task.NewAddTask(n2), }) if !n1.IsMain() { @@ -124,9 +127,9 @@ func TestPolicy_Update(t *testing.T) { t.Fatalf("inserted node should be evicted: %+v", n2) } - n3 := node.New[int, int](1, 1, 0, 109) - deleted = p.Write(nil, []node.WriteTask[int, int]{ - node.NewUpdateTask(n3, n1), + n3 := m.Create(1, 1, 0, 109) + deleted = p.Write(nil, []task.WriteTask[int, int]{ + task.NewUpdateTask(n3, n1), }) if n3.IsSmall() || n3.IsMain() || len(deleted) != 1 || deleted[0] != n3 { t.Fatalf("updated node should be evicted: %+v", n3) diff --git a/internal/s3fifo/queue.go b/internal/s3fifo/queue.go new file mode 100644 index 0000000..9cac9e5 --- /dev/null +++ b/internal/s3fifo/queue.go @@ -0,0 +1,75 @@ +package s3fifo + +import "github.com/maypok86/otter/internal/generated/node" + +type queue[K comparable, V any] struct { + head node.Node[K, V] + tail node.Node[K, V] + len int +} + +func newQueue[K comparable, V any]() *queue[K, V] { + return &queue[K, V]{} +} + +func (q *queue[K, V]) length() int { + return q.len +} + +func (q *queue[K, V]) isEmpty() bool { + return q.length() == 0 +} + +func (q *queue[K, V]) push(n node.Node[K, V]) { + if q.isEmpty() { + q.head = n + q.tail = n + } else { + n.SetPrev(q.tail) + q.tail.SetNext(n) + q.tail = n + } + + q.len++ +} + +func (q *queue[K, V]) pop() node.Node[K, V] { + if q.isEmpty() { + return nil + } + + result := q.head + q.remove(result) + return result +} + +func (q *queue[K, V]) remove(n node.Node[K, V]) { + next := n.Next() + prev := n.Prev() + + if node.Equals(prev, nil) { + if node.Equals(next, nil) && !node.Equals(q.head, n) { + return + } + + q.head = next + } else { + prev.SetNext(next) + n.SetPrev(nil) + } + + if node.Equals(next, nil) { + q.tail = prev + } else { + next.SetPrev(prev) + n.SetNext(nil) + } + + q.len-- +} + +func (q *queue[K, V]) clear() { + for !q.isEmpty() { + q.pop() + } +} diff --git a/internal/s3fifo/queue_test.go b/internal/s3fifo/queue_test.go new file mode 100644 index 0000000..481692c --- /dev/null +++ b/internal/s3fifo/queue_test.go @@ -0,0 +1,150 @@ +// Copyright (c) 2023 Alexey Mayshev. All rights reserved. +// Copyright 2009 The Go Authors. All rights reserved. +// +// Copyright notice. Initial version of the following tests was based on +// the following file from the Go Programming Language core repo: +// https://cs.opensource.google/go/go/+/refs/tags/go1.21.5:src/container/list/list_test.go +// +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. +// That can be found at https://cs.opensource.google/go/go/+/refs/tags/go1.21.5:LICENSE + +package s3fifo + +import ( + "strconv" + "testing" + + "github.com/maypok86/otter/internal/generated/node" +) + +func checkQueueLen[K comparable, V any](t *testing.T, q *queue[K, V], length int) bool { + t.Helper() + + if n := q.length(); n != length { + t.Errorf("q.length() = %d, want %d", n, length) + return false + } + return true +} + +func checkQueuePointers[K comparable, V any](t *testing.T, q *queue[K, V], nodes []node.Node[K, V]) { + t.Helper() + + if !checkQueueLen(t, q, len(nodes)) { + return + } + + // zero length queues must be the zero value + if len(nodes) == 0 { + if !(node.Equals(q.head, nil) && node.Equals(q.tail, nil)) { + t.Errorf("q.head = %p, q.tail = %p; both should be nil", q.head, q.tail) + } + return + } + + // check internal and external prev/next connections + for i, n := range nodes { + var prev node.Node[K, V] + if i > 0 { + prev = nodes[i-1] + } + if p := n.Prev(); !node.Equals(p, prev) { + t.Errorf("elt[%d](%p).prev = %p, want %p", i, n, p, prev) + } + + var next node.Node[K, V] + if i < len(nodes)-1 { + next = nodes[i+1] + } + if nn := n.Next(); !node.Equals(nn, next) { + t.Errorf("nodes[%d](%p).next = %p, want %p", i, n, nn, next) + } + } +} + +func newFakeNode[K comparable](e K) node.Node[K, K] { + m := node.NewManager[K, K](node.Config{WithCost: true}) + return m.Create(e, e, 0, 0) +} + +func TestQueue(t *testing.T) { + q := newQueue[string, string]() + checkQueuePointers(t, q, []node.Node[string, string]{}) + + // Single element queue + e := newFakeNode("a") + q.push(e) + checkQueuePointers(t, q, []node.Node[string, string]{e}) + q.remove(e) + q.push(e) + checkQueuePointers(t, q, []node.Node[string, string]{e}) + q.remove(e) + checkQueuePointers(t, q, []node.Node[string, string]{}) + + // Bigger queue + e2 := newFakeNode("2") + e1 := newFakeNode("1") + e3 := newFakeNode("3") + e4 := newFakeNode("4") + q.push(e1) + q.push(e2) + q.push(e3) + q.push(e4) + checkQueuePointers(t, q, []node.Node[string, string]{e1, e2, e3, e4}) + + q.remove(e2) + checkQueuePointers(t, q, []node.Node[string, string]{e1, e3, e4}) + + // move from middle + q.remove(e3) + q.push(e3) + checkQueuePointers(t, q, []node.Node[string, string]{e1, e4, e3}) + + q.clear() + q.push(e3) + q.push(e1) + q.push(e4) + checkQueuePointers(t, q, []node.Node[string, string]{e3, e1, e4}) + + // should be no-op + q.remove(e3) + q.push(e3) + checkQueuePointers(t, q, []node.Node[string, string]{e1, e4, e3}) + + // Check standard iteration. + sum := 0 + for e := q.head; !node.Equals(e, nil); e = e.Next() { + i, err := strconv.Atoi(e.Value()) + if err != nil { + continue + } + sum += i + } + if sum != 8 { + t.Errorf("sum over l = %d, want 8", sum) + } + + // Clear all elements by iterating + var next node.Node[string, string] + for e := q.head; !node.Equals(e, nil); e = next { + next = e.Next() + q.remove(e) + } + checkQueuePointers(t, q, []node.Node[string, string]{}) +} + +func TestQueue_Remove(t *testing.T) { + q := newQueue[int, int]() + + e1 := newFakeNode(1) + e2 := newFakeNode(2) + q.push(e1) + q.push(e2) + checkQueuePointers(t, q, []node.Node[int, int]{e1, e2}) + e := q.head + q.remove(e) + checkQueuePointers(t, q, []node.Node[int, int]{e2}) + q.remove(e) + checkQueuePointers(t, q, []node.Node[int, int]{e2}) +} diff --git a/internal/s3fifo/small.go b/internal/s3fifo/small.go index 3c2ce7d..1a617ec 100644 --- a/internal/s3fifo/small.go +++ b/internal/s3fifo/small.go @@ -15,11 +15,11 @@ package s3fifo import ( - "github.com/maypok86/otter/internal/node" + "github.com/maypok86/otter/internal/generated/node" ) type small[K comparable, V any] struct { - q *node.Queue[K, V] + q *queue[K, V] main *main[K, V] ghost *ghost[K, V] cost uint32 @@ -32,25 +32,25 @@ func newSmall[K comparable, V any]( ghost *ghost[K, V], ) *small[K, V] { return &small[K, V]{ - q: node.NewQueue[K, V](), + q: newQueue[K, V](), main: main, ghost: ghost, maxCost: maxCost, } } -func (s *small[K, V]) insert(n *node.Node[K, V]) { - s.q.Push(n) +func (s *small[K, V]) insert(n node.Node[K, V]) { + s.q.push(n) n.MarkSmall() s.cost += n.Cost() } -func (s *small[K, V]) evict(deleted []*node.Node[K, V]) []*node.Node[K, V] { +func (s *small[K, V]) evict(deleted []node.Node[K, V]) []node.Node[K, V] { if s.cost == 0 { return deleted } - n := s.q.Pop() + n := s.q.pop() s.cost -= n.Cost() n.Unmark() if n.IsExpired() { @@ -69,17 +69,17 @@ func (s *small[K, V]) evict(deleted []*node.Node[K, V]) []*node.Node[K, V] { return s.ghost.insert(deleted, n) } -func (s *small[K, V]) remove(n *node.Node[K, V]) { +func (s *small[K, V]) remove(n node.Node[K, V]) { s.cost -= n.Cost() n.Unmark() - s.q.Remove(n) + s.q.remove(n) } func (s *small[K, V]) length() int { - return s.q.Len() + return s.q.length() } func (s *small[K, V]) clear() { - s.q.Clear() + s.q.clear() s.cost = 0 } diff --git a/internal/node/task.go b/internal/task/task.go similarity index 84% rename from internal/node/task.go rename to internal/task/task.go index 1232d59..e85f070 100644 --- a/internal/node/task.go +++ b/internal/task/task.go @@ -12,7 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package node +package task + +import ( + "github.com/maypok86/otter/internal/generated/node" +) // reason represents the reason for writing the item to the cache. type reason uint8 @@ -28,13 +32,13 @@ const ( // WriteTask is a set of information to update the cache: // node, reason for write, difference after node cost change, etc. type WriteTask[K comparable, V any] struct { - n *Node[K, V] - oldNode *Node[K, V] + n node.Node[K, V] + oldNode node.Node[K, V] writeReason reason } // NewAddTask creates a task to add a node to policies. -func NewAddTask[K comparable, V any](n *Node[K, V]) WriteTask[K, V] { +func NewAddTask[K comparable, V any](n node.Node[K, V]) WriteTask[K, V] { return WriteTask[K, V]{ n: n, writeReason: addReason, @@ -42,7 +46,7 @@ func NewAddTask[K comparable, V any](n *Node[K, V]) WriteTask[K, V] { } // NewDeleteTask creates a task to delete a node from policies. -func NewDeleteTask[K comparable, V any](n *Node[K, V]) WriteTask[K, V] { +func NewDeleteTask[K comparable, V any](n node.Node[K, V]) WriteTask[K, V] { return WriteTask[K, V]{ n: n, writeReason: deleteReason, @@ -50,7 +54,7 @@ func NewDeleteTask[K comparable, V any](n *Node[K, V]) WriteTask[K, V] { } // NewUpdateTask creates a task to update the node in the policies. -func NewUpdateTask[K comparable, V any](n, oldNode *Node[K, V]) WriteTask[K, V] { +func NewUpdateTask[K comparable, V any](n, oldNode node.Node[K, V]) WriteTask[K, V] { return WriteTask[K, V]{ n: n, oldNode: oldNode, @@ -73,12 +77,12 @@ func NewCloseTask[K comparable, V any]() WriteTask[K, V] { } // Node returns the node contained in the task. If node was not specified, it returns nil. -func (t *WriteTask[K, V]) Node() *Node[K, V] { +func (t *WriteTask[K, V]) Node() node.Node[K, V] { return t.n } // OldNode returns the old node contained in the task. If old node was not specified, it returns nil. -func (t *WriteTask[K, V]) OldNode() *Node[K, V] { +func (t *WriteTask[K, V]) OldNode() node.Node[K, V] { return t.oldNode } diff --git a/internal/node/task_test.go b/internal/task/task_test.go similarity index 84% rename from internal/node/task_test.go rename to internal/task/task_test.go index ec909f8..d603d5d 100644 --- a/internal/node/task_test.go +++ b/internal/task/task_test.go @@ -12,13 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -package node +package task -import "testing" +import ( + "testing" + + "github.com/maypok86/otter/internal/generated/node" +) func TestTask(t *testing.T) { - n := New[int, int](1, 2, 6, 4) - oldNode := New[int, int](1, 3, 8, 6) + nm := node.NewManager[int, int](node.Config{ + WithExpiration: true, + WithCost: true, + }) + n := nm.Create(1, 2, 6, 4) + oldNode := nm.Create(1, 3, 8, 6) addTask := NewAddTask(n) if addTask.Node() != n || !addTask.IsAdd() { diff --git a/internal/unixtime/unixtime.go b/internal/unixtime/unixtime.go index c2c1c23..0737adf 100644 --- a/internal/unixtime/unixtime.go +++ b/internal/unixtime/unixtime.go @@ -77,3 +77,10 @@ func Stop() { func Now() uint32 { return atomic.LoadUint32(&now) } + +// SetNow sets the current time. +// +// NOTE: use only for testing and debugging. +func SetNow(t uint32) { + atomic.StoreUint32(&now, t) +}