Skip to content

Commit

Permalink
[Chore] Add an unbounded version of the cache
Browse files Browse the repository at this point in the history
  • Loading branch information
maypok86 committed Sep 1, 2024
1 parent 8f651b7 commit d2154ae
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 42 deletions.
92 changes: 74 additions & 18 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
"time"

"github.com/maypok86/otter/v2/internal/clock"
"github.com/maypok86/otter/v2/internal/eviction"
"github.com/maypok86/otter/v2/internal/eviction/s3fifo"
"github.com/maypok86/otter/v2/internal/expiry"
"github.com/maypok86/otter/v2/internal/generated/node"
"github.com/maypok86/otter/v2/internal/hashtable"
"github.com/maypok86/otter/v2/internal/lossy"
"github.com/maypok86/otter/v2/internal/queue"
"github.com/maypok86/otter/v2/internal/s3fifo"
"github.com/maypok86/otter/v2/internal/xmath"
"github.com/maypok86/otter/v2/internal/xruntime"
)
Expand Down Expand Up @@ -75,6 +76,14 @@ func init() {
maxStripedBufferSize = 4 * roundedParallelism
}

type evictionPolicy[K comparable, V any] interface {
Read(nodes []node.Node[K, V])
Add(n node.Node[K, V], nowNanos int64)
Delete(n node.Node[K, V])
MaxAvailableWeight() uint64
Clear()
}

type expiryPolicy[K comparable, V any] interface {
Add(n node.Node[K, V])
Delete(n node.Node[K, V])
Expand All @@ -87,7 +96,7 @@ type expiryPolicy[K comparable, V any] interface {
type Cache[K comparable, V any] struct {
nodeManager *node.Manager[K, V]
hashmap *hashtable.Map[K, V]
policy *s3fifo.Policy[K, V]
policy evictionPolicy[K, V]
expiryPolicy expiryPolicy[K, V]
stats statsCollector
logger Logger
Expand All @@ -103,6 +112,8 @@ type Cache[K comparable, V any] struct {
mask uint32
ttl time.Duration
withExpiration bool
withEviction bool
withProcess bool
}

// newCache returns a new cache instance based on the settings from Config.
Expand All @@ -113,9 +124,15 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] {
WithWeight: b.weigher != nil,
})

stripedBuffer := make([]*lossy.Buffer[K, V], 0, maxStripedBufferSize)
for i := 0; i < maxStripedBufferSize; i++ {
stripedBuffer = append(stripedBuffer, lossy.New[K, V](nodeManager))
maximum := b.getMaximum()
withEviction := maximum != nil

var stripedBuffer []*lossy.Buffer[K, V]
if withEviction {
stripedBuffer = make([]*lossy.Buffer[K, V], 0, maxStripedBufferSize)
for i := 0; i < maxStripedBufferSize; i++ {
stripedBuffer = append(stripedBuffer, lossy.New[K, V](nodeManager))
}
}

var hashmap *hashtable.Map[K, V]
Expand All @@ -130,9 +147,7 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] {
hashmap: hashmap,
stats: newStatsCollector(b.statsCollector),
logger: b.logger,
clock: clock.New(),
stripedBuffer: stripedBuffer,
writeBuffer: queue.NewGrowable[task[K, V]](minWriteBufferSize, maxWriteBufferSize),
doneClear: make(chan struct{}),
doneClose: make(chan struct{}, 1),
//nolint:gosec // there will never be an overflow
Expand All @@ -141,7 +156,11 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] {
deletionListener: b.deletionListener,
}

cache.policy = s3fifo.NewPolicy(*b.getMaximum(), cache.evictNode)
cache.withEviction = withEviction
cache.policy = eviction.NewDisabled[K, V]()
if cache.withEviction {
cache.policy = s3fifo.NewPolicy(*maximum, cache.evictNode)
}

switch {
case b.ttl != nil:
Expand All @@ -157,12 +176,20 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] {
}

cache.withExpiration = b.ttl != nil || b.withVariableTTL
cache.withProcess = cache.withEviction || cache.withExpiration

if cache.withProcess {
cache.writeBuffer = queue.NewGrowable[task[K, V]](minWriteBufferSize, maxWriteBufferSize)
}

if cache.withExpiration {
cache.clock = clock.New()
go cache.cleanup()
}

go cache.process()
if cache.withProcess {
go cache.process()
}

return cache
}
Expand Down Expand Up @@ -200,6 +227,7 @@ func (c *Cache[K, V]) GetNode(key K) (node.Node[K, V], bool) {
}

if n.HasExpired(c.clock.Offset()) {
// withProcess = true
// avoid duplicate push
deleted := c.hashmap.DeleteNode(n)
if deleted != nil {
Expand Down Expand Up @@ -230,6 +258,10 @@ func (c *Cache[K, V]) GetNodeQuietly(key K) (node.Node[K, V], bool) {
}

func (c *Cache[K, V]) afterGet(got node.Node[K, V]) {
if !c.withEviction {
return
}

idx := c.getReadBufferIdx()
pb := c.stripedBuffer[idx].Add(got)
if pb != nil {
Expand Down Expand Up @@ -293,14 +325,26 @@ func (c *Cache[K, V]) set(key K, value V, expiration int64, onlyIfAbsent bool) b
if onlyIfAbsent {
res := c.hashmap.SetIfAbsent(n)
if res == nil {
// insert
c.writeBuffer.Push(newAddTask(n))
c.afterWrite(n, nil)
return true
}
return false
}

evicted := c.hashmap.Set(n)
c.afterWrite(n, evicted)

return true
}

func (c *Cache[K, V]) afterWrite(n, evicted node.Node[K, V]) {
if !c.withProcess {
if evicted != nil {
c.notifyDeletion(n.Key(), n.Value(), Replaced)
}
return
}

if evicted != nil {
// update
evicted.Die()
Expand All @@ -309,8 +353,6 @@ func (c *Cache[K, V]) set(key K, value V, expiration int64, onlyIfAbsent bool) b
// insert
c.writeBuffer.Push(newAddTask(n))
}

return true
}

// Delete deletes the association for this key from the cache.
Expand All @@ -323,10 +365,17 @@ func (c *Cache[K, V]) deleteNode(n node.Node[K, V]) {
}

func (c *Cache[K, V]) afterDelete(deleted node.Node[K, V]) {
if deleted != nil {
deleted.Die()
c.writeBuffer.Push(newDeleteTask(deleted))
if deleted == nil {
return
}

if !c.withProcess {
c.notifyDeletion(deleted.Key(), deleted.Value(), Explicit)
return
}

deleted.Die()
c.writeBuffer.Push(newDeleteTask(deleted))
}

// DeleteByFunc deletes the association for this key from the cache when the given function returns true.
Expand Down Expand Up @@ -464,8 +513,15 @@ func (c *Cache[K, V]) Clear() {

func (c *Cache[K, V]) clear(t task[K, V]) {
c.hashmap.Clear()
for i := 0; i < len(c.stripedBuffer); i++ {
c.stripedBuffer[i].Clear()

if !c.withProcess {
return
}

if c.withEviction {
for i := 0; i < len(c.stripedBuffer); i++ {
c.stripedBuffer[i].Clear()
}
}

c.writeBuffer.Push(t)
Expand Down
58 changes: 53 additions & 5 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,55 @@ func getRandomSize(t *testing.T) int {
return r.Intn(maxSize-minSize) + minSize
}

func TestCache_Unbounded(t *testing.T) {
statsCounter := stats.NewCounter()
m := make(map[DeletionCause]int)
mutex := sync.Mutex{}
c, err := NewBuilder[int, int]().
DeletionListener(func(key int, value int, cause DeletionCause) {
mutex.Lock()
m[cause]++
mutex.Unlock()
}).
CollectStats(statsCounter).
Build()
if err != nil {
t.Fatalf("can not create cache: %v", err)
}

size := getRandomSize(t)
for i := 0; i < size; i++ {
c.Set(i, i)
}
for i := 0; i < size; i++ {
if !c.Has(i) {
t.Fatalf("the key must exist: %d", i)
}
}
for i := size; i < 2*size; i++ {
if c.Has(i) {
t.Fatalf("the key must not exist: %d", i)
}
}

replaced := size / 2
for i := 0; i < replaced; i++ {
c.Set(i, i)
}
for i := replaced; i < size; i++ {
c.Delete(i)
}

mutex.Lock()
defer mutex.Unlock()
if len(m) != 2 || m[Explicit] != size-replaced {
t.Fatalf("cache was supposed to delete %d, but deleted %d entries", size-replaced, m[Explicit])
}
if m[Replaced] != replaced {
t.Fatalf("cache was supposed to replace %d, but replaced %d entries", replaced, m[Replaced])
}
}

func TestCache_SetWithWeight(t *testing.T) {
size := uint64(10)
c, err := NewBuilder[uint32, int]().
Expand Down Expand Up @@ -252,7 +301,7 @@ func TestCache_SetIfAbsent(t *testing.T) {

for i := 0; i < size; i++ {
if !c.Has(i) {
t.Fatalf("key should exists: %d", i)
t.Fatalf("the key must exist: %d", i)
}
}

Expand Down Expand Up @@ -281,7 +330,7 @@ func TestCache_SetIfAbsent(t *testing.T) {

for i := 0; i < size; i++ {
if !cc.Has(i) {
t.Fatalf("key should exists: %d", i)
t.Fatalf("the key must exist: %d", i)
}
}

Expand Down Expand Up @@ -407,7 +456,7 @@ func TestCache_Delete(t *testing.T) {

for i := 0; i < size; i++ {
if !c.Has(i) {
t.Fatalf("key should exists: %d", i)
t.Fatalf("the key must exist: %d", i)
}
}

Expand All @@ -417,7 +466,7 @@ func TestCache_Delete(t *testing.T) {

for i := 0; i < size; i++ {
if c.Has(i) {
t.Fatalf("key should not exists: %d", i)
t.Fatalf("the key must not exist: %d", i)
}
}

Expand Down Expand Up @@ -638,7 +687,6 @@ func (h *optimalHeap) Pop() any {

func Test_GetExpired(t *testing.T) {
c, err := NewBuilder[string, string]().
MaximumSize(1000000).
CollectStats(stats.NewCounter()).
DeletionListener(func(key string, value string, cause DeletionCause) {
fmt.Println(cause)
Expand Down
28 changes: 23 additions & 5 deletions cmd/generator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,16 @@ func (g *generator) isBounded() bool {
return g.features[size] || g.features[weight]
}

func (g *generator) withState() bool {
return g.isBounded() || g.features[expiration]
}

func (g *generator) printImports() {
g.p("import (")
g.in()
g.p("\"sync/atomic\"")
if g.withState() {
g.p("\"sync/atomic\"")
}
g.p("\"unsafe\"")
g.out()
g.p(")")
Expand Down Expand Up @@ -225,7 +231,9 @@ func (g *generator) printStruct() {
g.p("weight uint32")
}

g.p("state uint32")
if g.withState() {
g.p("state uint32")
}
if g.isBounded() {
g.p("frequency uint8")
g.p("queueType uint8")
Expand All @@ -249,7 +257,9 @@ func (g *generator) printConstructors() {
if g.features[weight] {
g.p("weight: weight,")
}
g.p("state: aliveState,")
if g.withState() {
g.p("state: aliveState,")
}
g.out()
g.p("}")
g.out()
Expand Down Expand Up @@ -434,14 +444,22 @@ func (g *generator) printFunctions() {

g.p("func (n *%s[K, V]) IsAlive() bool {", g.structName)
g.in()
g.p("return atomic.LoadUint32(&n.state) == aliveState")
if g.withState() {
g.p("return atomic.LoadUint32(&n.state) == aliveState")
} else {
g.p("return true")
}
g.out()
g.p("}")
g.p("")

g.p("func (n *%s[K, V]) Die() {", g.structName)
g.in()
g.p("atomic.StoreUint32(&n.state, deadState)")
if g.withState() {
g.p("atomic.StoreUint32(&n.state, deadState)")
} else {
g.p("panic(\"not implemented\")")
}
g.out()
g.p("}")
g.p("")
Expand Down
4 changes: 4 additions & 0 deletions internal/clock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func New() *Clock {
}

func (c *Clock) Offset() int64 {
if c == nil {
// do not use the Clock unless initialized via New.
return 0
}
return time.Since(c.start).Nanoseconds()
}

Expand Down
Loading

0 comments on commit d2154ae

Please sign in to comment.