Skip to content

Commit

Permalink
[#55] Add proactive expiration policies
Browse files Browse the repository at this point in the history
  • Loading branch information
maypok86 committed Mar 1, 2024
1 parent d802e33 commit 8fc4145
Show file tree
Hide file tree
Showing 35 changed files with 1,220 additions and 871 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# vendor/

/.idea/
*.tmp
*coverage.txt
*lint.txt
**/bin/
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ test: test.unit ## Run all the tests
.PHONY: test.unit
test.unit: ## Run all unit tests
@echo 'mode: atomic' > coverage.txt
go test -covermode=atomic -coverprofile=coverage.txt -coverpkg=./... -v -race ./...
go test -covermode=atomic -coverprofile=coverage.txt.tmp -coverpkg=./... -v -race ./...
cat coverage.txt.tmp | grep -v "/generated/" > coverage.txt
rm coverage.txt.tmp

.PHONY: cover
cover: test.unit ## Run all the tests and opens the coverage report
Expand Down
3 changes: 3 additions & 0 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type baseOptions[K comparable, V any] struct {
capacity int
initialCapacity int
statsEnabled bool
withCost bool
costFunc func(key K, value V) uint32
}

Expand All @@ -49,6 +50,7 @@ func (o *baseOptions[K, V]) collectStats() {

func (o *baseOptions[K, V]) setCostFunc(costFunc func(key K, value V) uint32) {
o.costFunc = costFunc
o.withCost = true
}

func (o *baseOptions[K, V]) setInitialCapacity(initialCapacity int) {
Expand All @@ -75,6 +77,7 @@ func (o *baseOptions[K, V]) toConfig() core.Config[K, V] {
InitialCapacity: initialCapacity,
StatsEnabled: o.statsEnabled,
CostFunc: o.costFunc,
WithCost: o.withCost,
}
}

Expand Down
34 changes: 34 additions & 0 deletions cmd/generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ func (g *generator) printFunctions() {

g.p("func (n *%s[K, V]) SetPrev(v Node[K, V]) {", g.structName)
g.in()
g.p("if v == nil {")
g.in()
g.p("n.prev = nil")
g.p("return")
g.out()
g.p("}")
g.p("n.prev = (*%s[K, V])(v.AsPointer())", g.structName)
g.out()
g.p("}")
Expand All @@ -259,6 +265,12 @@ func (g *generator) printFunctions() {

g.p("func (n *%s[K, V]) SetNext(v Node[K, V]) {", g.structName)
g.in()
g.p("if v == nil {")
g.in()
g.p("n.next = nil")
g.p("return")
g.out()
g.p("}")
g.p("n.next = (*%s[K, V])(v.AsPointer())", g.structName)
g.out()
g.p("}")
Expand All @@ -278,6 +290,12 @@ func (g *generator) printFunctions() {
g.p("func (n *%s[K, V]) SetPrevExp(v Node[K, V]) {", g.structName)
g.in()
if g.features[expiration] {
g.p("if v == nil {")
g.in()
g.p("n.prevExp = nil")
g.p("return")
g.out()
g.p("}")
g.p("n.prevExp = (*%s[K, V])(v.AsPointer())", g.structName)
} else {
g.p("panic(\"not implemented\")")
Expand All @@ -300,6 +318,12 @@ func (g *generator) printFunctions() {
g.p("func (n *%s[K, V]) SetNextExp(v Node[K, V]) {", g.structName)
g.in()
if g.features[expiration] {
g.p("if v == nil {")
g.in()
g.p("n.nextExp = nil")
g.p("return")
g.out()
g.p("}")
g.p("n.nextExp = (*%s[K, V])(v.AsPointer())", g.structName)
} else {
g.p("panic(\"not implemented\")")
Expand Down Expand Up @@ -486,6 +510,16 @@ type Node[K comparable, V any] interface {
Unmark()
}
func Equals[K comparable, V any](a, b Node[K, V]) bool {
if a == nil {
return b == nil || b.AsPointer() == nil
}
if b == nil {
return a.AsPointer() == nil
}
return a.AsPointer() == b.AsPointer()
}
type Config struct {
WithExpiration bool
WithCost bool
Expand Down
106 changes: 66 additions & 40 deletions internal/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
"time"

"github.com/maypok86/otter/internal/expire"
"github.com/maypok86/otter/internal/generated/node"
"github.com/maypok86/otter/internal/hashtable"
"github.com/maypok86/otter/internal/lossy"
"github.com/maypok86/otter/internal/node"
"github.com/maypok86/otter/internal/queue"
"github.com/maypok86/otter/internal/s3fifo"
"github.com/maypok86/otter/internal/stats"
"github.com/maypok86/otter/internal/task"
"github.com/maypok86/otter/internal/unixtime"
"github.com/maypok86/otter/internal/xmath"
"github.com/maypok86/otter/internal/xruntime"
Expand All @@ -48,17 +49,26 @@ type Config[K comparable, V any] struct {
TTL *time.Duration
WithVariableTTL bool
CostFunc func(key K, value V) uint32
WithCost bool
}

type expirePolicy[K comparable, V any] interface {
Add(n node.Node[K, V])
Delete(n node.Node[K, V])
RemoveExpired(expired []node.Node[K, V]) []node.Node[K, V]
Clear()
}

// Cache is a structure performs a best-effort bounding of a hash table using eviction algorithm
// to determine which entries to evict when the capacity is exceeded.
type Cache[K comparable, V any] struct {
nodeManager *node.Manager[K, V]
hashmap *hashtable.Map[K, V]
policy *s3fifo.Policy[K, V]
expirePolicy *expire.Policy[K, V]
expirePolicy expirePolicy[K, V]
stats *stats.Stats
readBuffers []*lossy.Buffer[node.Node[K, V]]
writeBuffer *queue.MPSC[node.WriteTask[K, V]]
readBuffers []*lossy.Buffer[K, V]
writeBuffer *queue.MPSC[task.WriteTask[K, V]]
evictionMutex sync.Mutex
closeOnce sync.Once
doneClear chan struct{}
Expand All @@ -77,30 +87,46 @@ func NewCache[K comparable, V any](c Config[K, V]) *Cache[K, V] {
writeBufferCapacity := 128 * roundedParallelism
readBuffersCount := 4 * roundedParallelism

readBuffers := make([]*lossy.Buffer[node.Node[K, V]], 0, readBuffersCount)
nodeManager := node.NewManager[K, V](node.Config{
WithExpiration: c.TTL != nil || c.WithVariableTTL,
WithCost: c.WithCost,
})

readBuffers := make([]*lossy.Buffer[K, V], 0, readBuffersCount)
for i := 0; i < readBuffersCount; i++ {
readBuffers = append(readBuffers, lossy.New[node.Node[K, V]]())
readBuffers = append(readBuffers, lossy.New[K, V](nodeManager))
}

var hashmap *hashtable.Map[K, V]
if c.InitialCapacity == nil {
hashmap = hashtable.New[K, V]()
hashmap = hashtable.New[K, V](nodeManager)
} else {
hashmap = hashtable.NewWithSize[K, V](*c.InitialCapacity)
hashmap = hashtable.NewWithSize[K, V](nodeManager, *c.InitialCapacity)
}

var expPolicy expirePolicy[K, V]
switch {
case c.TTL != nil:
expPolicy = expire.NewFixed[K, V]()
case c.WithVariableTTL:
expPolicy = expire.NewVariable[K, V](nodeManager)
default:
expPolicy = expire.NewDisabled[K, V]()
}

cache := &Cache[K, V]{
hashmap: hashmap,
policy: s3fifo.NewPolicy[K, V](uint32(c.Capacity)),
readBuffers: readBuffers,
writeBuffer: queue.NewMPSC[node.WriteTask[K, V]](writeBufferCapacity),
doneClear: make(chan struct{}),
mask: uint32(readBuffersCount - 1),
costFunc: c.CostFunc,
capacity: c.Capacity,
nodeManager: nodeManager,
hashmap: hashmap,
policy: s3fifo.NewPolicy[K, V](uint32(c.Capacity)),
expirePolicy: expPolicy,
readBuffers: readBuffers,
writeBuffer: queue.NewMPSC[task.WriteTask[K, V]](writeBufferCapacity),
doneClear: make(chan struct{}),
mask: uint32(readBuffersCount - 1),
costFunc: c.CostFunc,
capacity: c.Capacity,
}

cache.expirePolicy = expire.NewPolicy[K, V]()
if c.StatsEnabled {
cache.stats = stats.New()
}
Expand Down Expand Up @@ -139,7 +165,7 @@ func (c *Cache[K, V]) Get(key K) (V, bool) {
}

if got.IsExpired() {
c.writeBuffer.Insert(node.NewDeleteTask(got))
c.writeBuffer.Insert(task.NewDeleteTask(got))
c.stats.IncMisses()
return zeroValue[V](), false
}
Expand All @@ -150,7 +176,7 @@ func (c *Cache[K, V]) Get(key K) (V, bool) {
return got.Value(), ok
}

func (c *Cache[K, V]) afterGet(got *node.Node[K, V]) {
func (c *Cache[K, V]) afterGet(got node.Node[K, V]) {
idx := c.getReadBufferIdx()
pb := c.readBuffers[idx].Add(got)
if pb != nil {
Expand Down Expand Up @@ -209,12 +235,12 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool)
return false
}

n := node.New(key, value, expiration, cost)
n := c.nodeManager.Create(key, value, expiration, cost)
if onlyIfAbsent {
res := c.hashmap.SetIfAbsent(n)
if res == nil {
// insert
c.writeBuffer.Insert(node.NewAddTask(n))
c.writeBuffer.Insert(task.NewAddTask(n))
return true
}
return false
Expand All @@ -223,10 +249,10 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool)
evicted := c.hashmap.Set(n)
if evicted != nil {
// update
c.writeBuffer.Insert(node.NewUpdateTask(n, evicted))
c.writeBuffer.Insert(task.NewUpdateTask(n, evicted))
} else {
// insert
c.writeBuffer.Insert(node.NewAddTask(n))
c.writeBuffer.Insert(task.NewAddTask(n))
}

return true
Expand All @@ -236,20 +262,20 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool)
func (c *Cache[K, V]) Delete(key K) {
deleted := c.hashmap.Delete(key)
if deleted != nil {
c.writeBuffer.Insert(node.NewDeleteTask(deleted))
c.writeBuffer.Insert(task.NewDeleteTask(deleted))
}
}

func (c *Cache[K, V]) deleteNode(n *node.Node[K, V]) {
func (c *Cache[K, V]) deleteNode(n node.Node[K, V]) {
deleted := c.hashmap.DeleteNode(n)
if deleted != nil {
c.writeBuffer.Insert(node.NewDeleteTask(deleted))
c.writeBuffer.Insert(task.NewDeleteTask(deleted))
}
}

// DeleteByFunc removes the association for this key from the cache when the given function returns true.
func (c *Cache[K, V]) DeleteByFunc(f func(key K, value V) bool) {
c.hashmap.Range(func(n *node.Node[K, V]) bool {
c.hashmap.Range(func(n node.Node[K, V]) bool {
if n.IsExpired() {
return true
}
Expand All @@ -263,7 +289,7 @@ func (c *Cache[K, V]) DeleteByFunc(f func(key K, value V) bool) {
}

func (c *Cache[K, V]) cleanup() {
expired := make([]*node.Node[K, V], 0, 128)
expired := make([]node.Node[K, V], 0, 128)
for {
time.Sleep(time.Second)

Expand All @@ -287,32 +313,32 @@ func (c *Cache[K, V]) cleanup() {

func (c *Cache[K, V]) process() {
bufferCapacity := 64
buffer := make([]node.WriteTask[K, V], 0, bufferCapacity)
deleted := make([]*node.Node[K, V], 0, bufferCapacity)
buffer := make([]task.WriteTask[K, V], 0, bufferCapacity)
deleted := make([]node.Node[K, V], 0, bufferCapacity)
i := 0
for {
task := c.writeBuffer.Remove()
t := c.writeBuffer.Remove()

if task.IsClear() || task.IsClose() {
if t.IsClear() || t.IsClose() {
buffer = clearBuffer(buffer)
c.writeBuffer.Clear()

c.evictionMutex.Lock()
c.policy.Clear()
c.expirePolicy.Clear()
if task.IsClose() {
if t.IsClose() {
c.isClosed = true
}
c.evictionMutex.Unlock()

c.doneClear <- struct{}{}
if task.IsClose() {
if t.IsClose() {
break
}
continue
}

buffer = append(buffer, task)
buffer = append(buffer, t)
i++
if i >= bufferCapacity {
i -= bufferCapacity
Expand Down Expand Up @@ -352,7 +378,7 @@ func (c *Cache[K, V]) process() {
//
// Iteration stops early when the given function returns false.
func (c *Cache[K, V]) Range(f func(key K, value V) bool) {
c.hashmap.Range(func(n *node.Node[K, V]) bool {
c.hashmap.Range(func(n node.Node[K, V]) bool {
if n.IsExpired() {
return true
}
Expand All @@ -365,16 +391,16 @@ func (c *Cache[K, V]) Range(f func(key K, value V) bool) {
//
// NOTE: this operation must be performed when no requests are made to the cache otherwise the behavior is undefined.
func (c *Cache[K, V]) Clear() {
c.clear(node.NewClearTask[K, V]())
c.clear(task.NewClearTask[K, V]())
}

func (c *Cache[K, V]) clear(task node.WriteTask[K, V]) {
func (c *Cache[K, V]) clear(t task.WriteTask[K, V]) {
c.hashmap.Clear()
for i := 0; i < len(c.readBuffers); i++ {
c.readBuffers[i].Clear()
}

c.writeBuffer.Insert(task)
c.writeBuffer.Insert(t)
<-c.doneClear

c.stats.Clear()
Expand All @@ -385,7 +411,7 @@ func (c *Cache[K, V]) clear(task node.WriteTask[K, V]) {
// NOTE: this operation must be performed when no requests are made to the cache otherwise the behavior is undefined.
func (c *Cache[K, V]) Close() {
c.closeOnce.Do(func() {
c.clear(node.NewCloseTask[K, V]())
c.clear(task.NewCloseTask[K, V]())
if c.withExpiration {
unixtime.Stop()
}
Expand Down
9 changes: 7 additions & 2 deletions internal/core/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"testing"
"time"

"github.com/maypok86/otter/internal/node"
"github.com/maypok86/otter/internal/generated/node"
)

func TestCache_SetWithCost(t *testing.T) {
Expand Down Expand Up @@ -48,8 +48,13 @@ func TestCache_Range(t *testing.T) {

time.Sleep(3 * time.Second)

nm := node.NewManager[int, int](node.Config{
WithExpiration: true,
WithCost: true,
})

c.Set(1, 1)
c.hashmap.Set(node.New(2, 2, 1, 1))
c.hashmap.Set(nm.Create(2, 2, 1, 1))
c.Set(3, 3)
aliveNodes := 2
iters := 0
Expand Down
Loading

0 comments on commit 8fc4145

Please sign in to comment.