Skip to content

Commit

Permalink
[#59] Fix non-deterministic node insertion
Browse files Browse the repository at this point in the history
  • Loading branch information
maypok86 committed Mar 3, 2024
1 parent 4ac2bcf commit c66ac1f
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 222 deletions.
66 changes: 39 additions & 27 deletions internal/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/maypok86/otter/internal/queue"
"github.com/maypok86/otter/internal/s3fifo"
"github.com/maypok86/otter/internal/stats"
"github.com/maypok86/otter/internal/task"
"github.com/maypok86/otter/internal/unixtime"
"github.com/maypok86/otter/internal/xmath"
"github.com/maypok86/otter/internal/xruntime"
Expand Down Expand Up @@ -68,7 +67,7 @@ type Cache[K comparable, V any] struct {
expirePolicy expirePolicy[K, V]
stats *stats.Stats
readBuffers []*lossy.Buffer[K, V]
writeBuffer *queue.MPSC[task.WriteTask[K, V]]
writeBuffer *queue.MPSC[task[K, V]]
evictionMutex sync.Mutex
closeOnce sync.Once
doneClear chan struct{}
Expand Down Expand Up @@ -120,7 +119,7 @@ func NewCache[K comparable, V any](c Config[K, V]) *Cache[K, V] {
policy: s3fifo.NewPolicy[K, V](uint32(c.Capacity)),
expirePolicy: expPolicy,
readBuffers: readBuffers,
writeBuffer: queue.NewMPSC[task.WriteTask[K, V]](writeBufferCapacity),
writeBuffer: queue.NewMPSC[task[K, V]](writeBufferCapacity),
doneClear: make(chan struct{}),
mask: uint32(readBuffersCount - 1),
costFunc: c.CostFunc,
Expand Down Expand Up @@ -165,7 +164,7 @@ func (c *Cache[K, V]) Get(key K) (V, bool) {
}

if got.IsExpired() {
c.writeBuffer.Insert(task.NewDeleteTask(got))
c.writeBuffer.Insert(newDeleteTask(got))
c.stats.IncMisses()
return zeroValue[V](), false
}
Expand Down Expand Up @@ -240,7 +239,7 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool)
res := c.hashmap.SetIfAbsent(n)
if res == nil {
// insert
c.writeBuffer.Insert(task.NewAddTask(n))
c.writeBuffer.Insert(newAddTask(n))
return true
}
return false
Expand All @@ -250,10 +249,10 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool)
if evicted != nil {
// update
evicted.Die()
c.writeBuffer.Insert(task.NewUpdateTask(n, evicted))
c.writeBuffer.Insert(newUpdateTask(n, evicted))
} else {
// insert
c.writeBuffer.Insert(task.NewAddTask(n))
c.writeBuffer.Insert(newAddTask(n))
}

return true
Expand All @@ -271,7 +270,7 @@ func (c *Cache[K, V]) deleteNode(n node.Node[K, V]) {
func (c *Cache[K, V]) afterDelete(deleted node.Node[K, V]) {
if deleted != nil {
deleted.Die()
c.writeBuffer.Insert(task.NewDeleteTask(deleted))
c.writeBuffer.Insert(newDeleteTask(deleted))
}
}

Expand All @@ -291,7 +290,8 @@ func (c *Cache[K, V]) DeleteByFunc(f func(key K, value V) bool) {
}

func (c *Cache[K, V]) cleanup() {
expired := make([]node.Node[K, V], 0, 128)
bufferCapacity := 64
expired := make([]node.Node[K, V], 0, bufferCapacity)
for {
time.Sleep(time.Second)

Expand All @@ -300,42 +300,47 @@ func (c *Cache[K, V]) cleanup() {
return
}

e := c.expirePolicy.RemoveExpired(expired)
c.policy.Delete(e)
expired = c.expirePolicy.RemoveExpired(expired)
for _, n := range expired {
c.policy.Delete(n)
}

c.evictionMutex.Unlock()

for _, n := range e {
for _, n := range expired {
c.hashmap.DeleteNode(n)
n.Die()
}

expired = clearBuffer(expired)
if cap(expired) > 3*bufferCapacity {
expired = make([]node.Node[K, V], 0, bufferCapacity)
}
}
}

func (c *Cache[K, V]) process() {
bufferCapacity := 64
buffer := make([]task.WriteTask[K, V], 0, bufferCapacity)
buffer := make([]task[K, V], 0, bufferCapacity)
deleted := make([]node.Node[K, V], 0, bufferCapacity)
i := 0
for {
t := c.writeBuffer.Remove()

if t.IsClear() || t.IsClose() {
if t.isClear() || t.isClose() {
buffer = clearBuffer(buffer)
c.writeBuffer.Clear()

c.evictionMutex.Lock()
c.policy.Clear()
c.expirePolicy.Clear()
if t.IsClose() {
if t.isClose() {
c.isClosed = true
}
c.evictionMutex.Unlock()

c.doneClear <- struct{}{}
if t.IsClose() {
if t.isClose() {
break
}
continue
Expand All @@ -349,36 +354,43 @@ func (c *Cache[K, V]) process() {
c.evictionMutex.Lock()

for _, t := range buffer {
n := t.Node()
n := t.node()
switch {
case t.IsDelete():
case t.isDelete():
c.expirePolicy.Delete(n)
case t.IsAdd():
c.policy.Delete(n)
case t.isAdd():
if n.IsAlive() {
c.expirePolicy.Add(n)
deleted = c.policy.Add(deleted, n)
}
case t.IsUpdate():
c.expirePolicy.Delete(t.OldNode())
case t.isUpdate():
oldNode := t.oldNode()
c.expirePolicy.Delete(oldNode)
c.policy.Delete(oldNode)
if n.IsAlive() {
c.expirePolicy.Add(n)
deleted = c.policy.Add(deleted, n)
}
}
}

d := c.policy.Write(deleted, buffer)
for _, n := range d {
for _, n := range deleted {
c.expirePolicy.Delete(n)
}

c.evictionMutex.Unlock()

for _, n := range d {
for _, n := range deleted {
c.hashmap.DeleteNode(n)
n.Die()
}

buffer = clearBuffer(buffer)
deleted = clearBuffer(deleted)
if cap(deleted) > 3*bufferCapacity {
deleted = make([]node.Node[K, V], 0, bufferCapacity)
}
}
}
}
Expand All @@ -400,10 +412,10 @@ func (c *Cache[K, V]) Range(f func(key K, value V) bool) {
//
// NOTE: this operation must be performed when no requests are made to the cache otherwise the behavior is undefined.
func (c *Cache[K, V]) Clear() {
c.clear(task.NewClearTask[K, V]())
c.clear(newClearTask[K, V]())
}

func (c *Cache[K, V]) clear(t task.WriteTask[K, V]) {
func (c *Cache[K, V]) clear(t task[K, V]) {
c.hashmap.Clear()
for i := 0; i < len(c.readBuffers); i++ {
c.readBuffers[i].Clear()
Expand All @@ -420,7 +432,7 @@ func (c *Cache[K, V]) clear(t task.WriteTask[K, V]) {
// NOTE: this operation must be performed when no requests are made to the cache otherwise the behavior is undefined.
func (c *Cache[K, V]) Close() {
c.closeOnce.Do(func() {
c.clear(task.NewCloseTask[K, V]())
c.clear(newCloseTask[K, V]())
if c.withExpiration {
unixtime.Stop()
}
Expand Down
112 changes: 112 additions & 0 deletions internal/core/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) 2023 Alexey Mayshev. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
"github.com/maypok86/otter/internal/generated/node"
)

// reason represents the reason for writing the item to the cache.
type reason uint8

const (
addReason reason = iota + 1
deleteReason
updateReason
clearReason
closeReason
)

// task is a set of information to update the cache:
// node, reason for write, difference after node cost change, etc.
type task[K comparable, V any] struct {
n node.Node[K, V]
old node.Node[K, V]
writeReason reason
}

// newAddTask creates a task to add a node to policies.
func newAddTask[K comparable, V any](n node.Node[K, V]) task[K, V] {
return task[K, V]{
n: n,
writeReason: addReason,
}
}

// newDeleteTask creates a task to delete a node from policies.
func newDeleteTask[K comparable, V any](n node.Node[K, V]) task[K, V] {
return task[K, V]{
n: n,
writeReason: deleteReason,
}
}

// newUpdateTask creates a task to update the node in the policies.
func newUpdateTask[K comparable, V any](n, oldNode node.Node[K, V]) task[K, V] {
return task[K, V]{
n: n,
old: oldNode,
writeReason: updateReason,
}
}

// newClearTask creates a task to clear policies.
func newClearTask[K comparable, V any]() task[K, V] {
return task[K, V]{
writeReason: clearReason,
}
}

// newCloseTask creates a task to clear policies and stop all goroutines.
func newCloseTask[K comparable, V any]() task[K, V] {
return task[K, V]{
writeReason: closeReason,
}
}

// node returns the node contained in the task. If node was not specified, it returns nil.
func (t *task[K, V]) node() node.Node[K, V] {
return t.n
}

// oldNode returns the old node contained in the task. If old node was not specified, it returns nil.
func (t *task[K, V]) oldNode() node.Node[K, V] {
return t.old
}

// isAdd returns true if this is an add task.
func (t *task[K, V]) isAdd() bool {
return t.writeReason == addReason
}

// isDelete returns true if this is a delete task.
func (t *task[K, V]) isDelete() bool {
return t.writeReason == deleteReason
}

// isUpdate returns true if this is an update task.
func (t *task[K, V]) isUpdate() bool {
return t.writeReason == updateReason
}

// isClear returns true if this is a clear task.
func (t *task[K, V]) isClear() bool {
return t.writeReason == clearReason
}

// isClose returns true if this is a close task.
func (t *task[K, V]) isClose() bool {
return t.writeReason == closeReason
}
22 changes: 11 additions & 11 deletions internal/task/task_test.go → internal/core/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package task
package core

import (
"testing"
Expand All @@ -28,28 +28,28 @@ func TestTask(t *testing.T) {
n := nm.Create(1, 2, 6, 4)
oldNode := nm.Create(1, 3, 8, 6)

addTask := NewAddTask(n)
if addTask.Node() != n || !addTask.IsAdd() {
addTask := newAddTask(n)
if addTask.node() != n || !addTask.isAdd() {
t.Fatalf("not valid add task %+v", addTask)
}

deleteTask := NewDeleteTask(n)
if deleteTask.Node() != n || !deleteTask.IsDelete() {
deleteTask := newDeleteTask(n)
if deleteTask.node() != n || !deleteTask.isDelete() {
t.Fatalf("not valid delete task %+v", deleteTask)
}

updateTask := NewUpdateTask(n, oldNode)
if updateTask.Node() != n || !updateTask.IsUpdate() || updateTask.OldNode() != oldNode {
updateTask := newUpdateTask(n, oldNode)
if updateTask.node() != n || !updateTask.isUpdate() || updateTask.oldNode() != oldNode {
t.Fatalf("not valid update task %+v", updateTask)
}

clearTask := NewClearTask[int, int]()
if clearTask.Node() != nil || !clearTask.IsClear() {
clearTask := newClearTask[int, int]()
if clearTask.node() != nil || !clearTask.isClear() {
t.Fatalf("not valid clear task %+v", clearTask)
}

closeTask := NewCloseTask[int, int]()
if closeTask.Node() != nil || !closeTask.IsClose() {
closeTask := newCloseTask[int, int]()
if closeTask.node() != nil || !closeTask.isClose() {
t.Fatalf("not valid close task %+v", closeTask)
}
}
Loading

0 comments on commit c66ac1f

Please sign in to comment.