Skip to content

Commit

Permalink
[#59] Fix non-deterministic node insertion
Browse files Browse the repository at this point in the history
  • Loading branch information
maypok86 committed Mar 3, 2024
1 parent 6bb9e32 commit edd28dd
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 242 deletions.
50 changes: 25 additions & 25 deletions cmd/generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (g *generator) printStruct() {

func (g *generator) printConstructors() {
g.p("// New%s creates a new %s.", g.structName, g.structName)
g.p("func New%s[K comparable, V any](key K, value V, expiration, cost uint32) Node[K, V] {", g.structName)
g.p("func New%s[K comparable, V any](key K, value V, expiration, cost uint32) node[K, V] {", g.structName)
g.in()
g.p("return &%s[K, V]{", g.structName)
g.in()
Expand All @@ -210,7 +210,7 @@ func (g *generator) printConstructors() {
g.p("")

g.p("// CastPointerTo%s casts a pointer to %s.", g.structName, g.structName)
g.p("func CastPointerTo%s[K comparable, V any](ptr unsafe.Pointer) Node[K, V] {", g.structName)
g.p("func CastPointerTo%s[K comparable, V any](ptr unsafe.Pointer) node[K, V] {", g.structName)
g.in()
g.p("return (*%s[K, V])(ptr)", g.structName)
g.out()
Expand Down Expand Up @@ -240,14 +240,14 @@ func (g *generator) printFunctions() {
g.p("}")
g.p("")

g.p("func (n *%s[K, V]) Prev() Node[K, V] {", g.structName)
g.p("func (n *%s[K, V]) Prev() node[K, V] {", g.structName)
g.in()
g.p("return n.prev")
g.out()
g.p("}")
g.p("")

g.p("func (n *%s[K, V]) SetPrev(v Node[K, V]) {", g.structName)
g.p("func (n *%s[K, V]) SetPrev(v node[K, V]) {", g.structName)
g.in()
g.p("if v == nil {")
g.in()
Expand All @@ -260,14 +260,14 @@ func (g *generator) printFunctions() {
g.p("}")
g.p("")

g.p("func (n *%s[K, V]) Next() Node[K, V] {", g.structName)
g.p("func (n *%s[K, V]) Next() node[K, V] {", g.structName)
g.in()
g.p("return n.next")
g.out()
g.p("}")
g.p("")

g.p("func (n *%s[K, V]) SetNext(v Node[K, V]) {", g.structName)
g.p("func (n *%s[K, V]) SetNext(v node[K, V]) {", g.structName)
g.in()
g.p("if v == nil {")
g.in()
Expand All @@ -280,7 +280,7 @@ func (g *generator) printFunctions() {
g.p("}")
g.p("")

g.p("func (n *%s[K, V]) PrevExp() Node[K, V] {", g.structName)
g.p("func (n *%s[K, V]) PrevExp() node[K, V] {", g.structName)
g.in()
if g.features[expiration] {
g.p("return n.prevExp")
Expand All @@ -291,7 +291,7 @@ func (g *generator) printFunctions() {
g.p("}")
g.p("")

g.p("func (n *%s[K, V]) SetPrevExp(v Node[K, V]) {", g.structName)
g.p("func (n *%s[K, V]) SetPrevExp(v node[K, V]) {", g.structName)
g.in()
if g.features[expiration] {
g.p("if v == nil {")
Expand All @@ -308,7 +308,7 @@ func (g *generator) printFunctions() {
g.p("}")
g.p("")

g.p("func (n *%s[K, V]) NextExp() Node[K, V] {", g.structName)
g.p("func (n *%s[K, V]) NextExp() node[K, V] {", g.structName)
g.in()
if g.features[expiration] {
g.p("return n.nextExp")
Expand All @@ -319,7 +319,7 @@ func (g *generator) printFunctions() {
g.p("}")
g.p("")

g.p("func (n *%s[K, V]) SetNextExp(v Node[K, V]) {", g.structName)
g.p("func (n *%s[K, V]) SetNextExp(v node[K, V]) {", g.structName)
g.in()
if g.features[expiration] {
g.p("if v == nil {")
Expand Down Expand Up @@ -477,30 +477,30 @@ const (
deadState
)
// Node is a cache entry.
type Node[K comparable, V any] interface {
// node is a cache entry.
type node[K comparable, V any] interface {
// Key returns the key.
Key() K
// Value returns the value.
Value() V
// AsPointer returns the node as a pointer.
AsPointer() unsafe.Pointer
// Prev returns the previous node in the eviction policy.
Prev() Node[K, V]
Prev() node[K, V]
// SetPrev sets the previous node in the eviction policy.
SetPrev(v Node[K, V])
SetPrev(v node[K, V])
// Next returns the next node in the eviction policy.
Next() Node[K, V]
Next() node[K, V]
// SetNext sets the next node in the eviction policy.
SetNext(v Node[K, V])
SetNext(v node[K, V])
// PrevExp returns the previous node in the expiration policy.
PrevExp() Node[K, V]
PrevExp() node[K, V]
// SetPrevExp sets the previous node in the expiration policy.
SetPrevExp(v Node[K, V])
SetPrevExp(v node[K, V])
// NextExp returns the next node in the expiration policy.
NextExp() Node[K, V]
NextExp() node[K, V]
// SetNextExp sets the next node in the expiration policy.
SetNextExp(v Node[K, V])
SetNextExp(v node[K, V])
// IsExpired returns true if node is expired.
IsExpired() bool
// Expiration returns the expiration time.
Expand Down Expand Up @@ -531,7 +531,7 @@ type Node[K comparable, V any] interface {
Unmark()
}
func Equals[K comparable, V any](a, b Node[K, V]) bool {
func Equals[K comparable, V any](a, b node[K, V]) bool {
if a == nil {
return b == nil || b.AsPointer() == nil
}
Expand All @@ -547,8 +547,8 @@ type Config struct {
}
type Manager[K comparable, V any] struct {
create func(key K, value V, expiration, cost uint32) Node[K, V]
fromPointer func(ptr unsafe.Pointer) Node[K, V]
create func(key K, value V, expiration, cost uint32) node[K, V]
fromPointer func(ptr unsafe.Pointer) node[K, V]
}
func NewManager[K comparable, V any](c Config) *Manager[K, V] {
Expand All @@ -567,11 +567,11 @@ func NewManager[K comparable, V any](c Config) *Manager[K, V] {
const nodeFooter = `return m
}
func (m *Manager[K, V]) Create(key K, value V, expiration, cost uint32) Node[K, V] {
func (m *Manager[K, V]) Create(key K, value V, expiration, cost uint32) node[K, V] {
return m.create(key, value, expiration, cost)
}
func (m *Manager[K, V]) FromPointer(ptr unsafe.Pointer) Node[K, V] {
func (m *Manager[K, V]) FromPointer(ptr unsafe.Pointer) node[K, V] {
return m.fromPointer(ptr)
}
Expand Down
50 changes: 28 additions & 22 deletions internal/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/maypok86/otter/internal/queue"
"github.com/maypok86/otter/internal/s3fifo"
"github.com/maypok86/otter/internal/stats"
"github.com/maypok86/otter/internal/task"
"github.com/maypok86/otter/internal/unixtime"
"github.com/maypok86/otter/internal/xmath"
"github.com/maypok86/otter/internal/xruntime"
Expand Down Expand Up @@ -68,7 +67,7 @@ type Cache[K comparable, V any] struct {
expirePolicy expirePolicy[K, V]
stats *stats.Stats
readBuffers []*lossy.Buffer[K, V]
writeBuffer *queue.MPSC[task.WriteTask[K, V]]
writeBuffer *queue.MPSC[task[K, V]]
evictionMutex sync.Mutex
closeOnce sync.Once
doneClear chan struct{}
Expand Down Expand Up @@ -120,7 +119,7 @@ func NewCache[K comparable, V any](c Config[K, V]) *Cache[K, V] {
policy: s3fifo.NewPolicy[K, V](uint32(c.Capacity)),
expirePolicy: expPolicy,
readBuffers: readBuffers,
writeBuffer: queue.NewMPSC[task.WriteTask[K, V]](writeBufferCapacity),
writeBuffer: queue.NewMPSC[task[K, V]](writeBufferCapacity),
doneClear: make(chan struct{}),
mask: uint32(readBuffersCount - 1),
costFunc: c.CostFunc,
Expand Down Expand Up @@ -165,7 +164,7 @@ func (c *Cache[K, V]) Get(key K) (V, bool) {
}

if got.IsExpired() {
c.writeBuffer.Insert(task.NewDeleteTask(got))
c.writeBuffer.Insert(newDeleteTask(got))
c.stats.IncMisses()
return zeroValue[V](), false
}
Expand Down Expand Up @@ -240,7 +239,7 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool)
res := c.hashmap.SetIfAbsent(n)
if res == nil {
// insert
c.writeBuffer.Insert(task.NewAddTask(n))
c.writeBuffer.Insert(newAddTask(n))
return true
}
return false
Expand All @@ -250,10 +249,10 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool)
if evicted != nil {
// update
evicted.Die()
c.writeBuffer.Insert(task.NewUpdateTask(n, evicted))
c.writeBuffer.Insert(newUpdateTask(n, evicted))
} else {
// insert
c.writeBuffer.Insert(task.NewAddTask(n))
c.writeBuffer.Insert(newAddTask(n))
}

return true
Expand All @@ -271,7 +270,7 @@ func (c *Cache[K, V]) deleteNode(n node.Node[K, V]) {
func (c *Cache[K, V]) afterDelete(deleted node.Node[K, V]) {
if deleted != nil {
deleted.Die()
c.writeBuffer.Insert(task.NewDeleteTask(deleted))
c.writeBuffer.Insert(newDeleteTask(deleted))
}
}

Expand Down Expand Up @@ -301,7 +300,9 @@ func (c *Cache[K, V]) cleanup() {
}

e := c.expirePolicy.RemoveExpired(expired)
c.policy.Delete(e)
for _, n := range e {
c.policy.Delete(n)
}

c.evictionMutex.Unlock()

Expand All @@ -316,26 +317,26 @@ func (c *Cache[K, V]) cleanup() {

func (c *Cache[K, V]) process() {
bufferCapacity := 64
buffer := make([]task.WriteTask[K, V], 0, bufferCapacity)
buffer := make([]task[K, V], 0, bufferCapacity)
deleted := make([]node.Node[K, V], 0, bufferCapacity)
i := 0
for {
t := c.writeBuffer.Remove()

if t.IsClear() || t.IsClose() {
if t.isClear() || t.isClose() {
buffer = clearBuffer(buffer)
c.writeBuffer.Clear()

c.evictionMutex.Lock()
c.policy.Clear()
c.expirePolicy.Clear()
if t.IsClose() {
if t.isClose() {
c.isClosed = true
}
c.evictionMutex.Unlock()

c.doneClear <- struct{}{}
if t.IsClose() {
if t.isClose() {
break
}
continue
Expand All @@ -345,27 +346,32 @@ func (c *Cache[K, V]) process() {
i++
if i >= bufferCapacity {
i -= bufferCapacity
d := deleted

c.evictionMutex.Lock()

for _, t := range buffer {
n := t.Node()
n := t.node()
switch {
case t.IsDelete():
case t.isDelete():
c.expirePolicy.Delete(n)
case t.IsAdd():
c.policy.Delete(n)
case t.isAdd():
if n.IsAlive() {
c.expirePolicy.Add(n)
d = c.policy.Add(d, n)
}
case t.IsUpdate():
c.expirePolicy.Delete(t.OldNode())
case t.isUpdate():
oldNode := t.oldNode()
c.expirePolicy.Delete(oldNode)
c.policy.Delete(oldNode)
if n.IsAlive() {
c.expirePolicy.Add(n)
d = c.policy.Add(d, n)
}
}
}

d := c.policy.Write(deleted, buffer)
for _, n := range d {
c.expirePolicy.Delete(n)
}
Expand Down Expand Up @@ -400,10 +406,10 @@ func (c *Cache[K, V]) Range(f func(key K, value V) bool) {
//
// NOTE: this operation must be performed when no requests are made to the cache otherwise the behavior is undefined.
func (c *Cache[K, V]) Clear() {
c.clear(task.NewClearTask[K, V]())
c.clear(newClearTask[K, V]())
}

func (c *Cache[K, V]) clear(t task.WriteTask[K, V]) {
func (c *Cache[K, V]) clear(t task[K, V]) {
c.hashmap.Clear()
for i := 0; i < len(c.readBuffers); i++ {
c.readBuffers[i].Clear()
Expand All @@ -420,7 +426,7 @@ func (c *Cache[K, V]) clear(t task.WriteTask[K, V]) {
// NOTE: this operation must be performed when no requests are made to the cache otherwise the behavior is undefined.
func (c *Cache[K, V]) Close() {
c.closeOnce.Do(func() {
c.clear(task.NewCloseTask[K, V]())
c.clear(newCloseTask[K, V]())
if c.withExpiration {
unixtime.Stop()
}
Expand Down
Loading

0 comments on commit edd28dd

Please sign in to comment.