Skip to content

Commit

Permalink
feat: make lossy buffers dynamically striped
Browse files Browse the repository at this point in the history
  • Loading branch information
maypok86 committed Sep 15, 2024
1 parent 7929499 commit 9bbe893
Show file tree
Hide file tree
Showing 10 changed files with 700 additions and 182 deletions.
36 changes: 10 additions & 26 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func init() {
}

type evictionPolicy[K comparable, V any] interface {
Read(nodes []node.Node[K, V])
Read(n node.Node[K, V])
Add(n node.Node[K, V], nowNanos int64)
Delete(n node.Node[K, V])
MaxAvailableWeight() uint64
Expand All @@ -73,15 +73,14 @@ type Cache[K comparable, V any] struct {
stats statsRecorder
logger Logger
clock *clock.Clock
stripedBuffer []*lossy.Buffer[K, V]
stripedBuffer *lossy.Striped[K, V]
writeBuffer *queue.Growable[task[K, V]]
evictionMutex sync.Mutex
closeOnce sync.Once
doneClear chan struct{}
doneClose chan struct{}
weigher func(key K, value V) uint32
onDeletion func(e DeletionEvent[K, V])
mask uint32
ttl time.Duration
withExpiration bool
withEviction bool
Expand All @@ -99,12 +98,9 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] {
maximum := b.getMaximum()
withEviction := maximum != nil

var stripedBuffer []*lossy.Buffer[K, V]
var stripedBuffer *lossy.Striped[K, V]
if withEviction {
stripedBuffer = make([]*lossy.Buffer[K, V], 0, maxStripedBufferSize)
for i := 0; i < maxStripedBufferSize; i++ {
stripedBuffer = append(stripedBuffer, lossy.New[K, V](nodeManager))
}
stripedBuffer = lossy.NewStriped(maxStripedBufferSize, nodeManager)
}

var hashmap *hashtable.Map[K, V]
Expand All @@ -122,10 +118,8 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] {
stripedBuffer: stripedBuffer,
doneClear: make(chan struct{}),
doneClose: make(chan struct{}, 1),
//nolint:gosec // there will never be an overflow
mask: uint32(maxStripedBufferSize - 1),
weigher: b.getWeigher(),
onDeletion: b.onDeletion,
weigher: b.getWeigher(),
onDeletion: b.onDeletion,
}

cache.withEviction = withEviction
Expand Down Expand Up @@ -166,10 +160,6 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] {
return cache
}

func (c *Cache[K, V]) getReadBufferIdx() int {
return int(xruntime.Fastrand() & c.mask)
}

func (c *Cache[K, V]) getExpiration(duration time.Duration) int64 {
return c.clock.Offset() + duration.Nanoseconds()
}
Expand Down Expand Up @@ -234,14 +224,10 @@ func (c *Cache[K, V]) afterGet(got node.Node[K, V]) {
return
}

idx := c.getReadBufferIdx()
pb := c.stripedBuffer[idx].Add(got)
if pb != nil {
c.evictionMutex.Lock()
c.policy.Read(pb.Returned)
result := c.stripedBuffer.Add(got)
if result == lossy.Full && c.evictionMutex.TryLock() {
c.stripedBuffer.DrainTo(c.policy.Read)
c.evictionMutex.Unlock()

c.stripedBuffer[idx].Free()
}
}

Expand Down Expand Up @@ -521,9 +507,7 @@ func (c *Cache[K, V]) clear(t task[K, V]) {
}

if c.withEviction {
for i := 0; i < len(c.stripedBuffer); i++ {
c.stripedBuffer[i].Clear()
}
c.stripedBuffer.Clear()
}

c.writeBuffer.Push(t)
Expand Down
7 changes: 2 additions & 5 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestCache_PinnedWeight(t *testing.T) {
}
return 1
}).
WithTTL(3 * time.Second).
WithTTL(2 * time.Second).
OnDeletion(func(e DeletionEvent[int, int]) {
mutex.Lock()
m[e.Cause]++
Expand All @@ -127,9 +127,6 @@ func TestCache_PinnedWeight(t *testing.T) {
}
for i := size; i < 2*size; i++ {
c.Set(i, i)
}
time.Sleep(time.Second)
for i := size; i < 2*size; i++ {
if !c.Has(i) {
t.Fatalf("the key must exist: %d", i)
}
Expand All @@ -139,7 +136,7 @@ func TestCache_PinnedWeight(t *testing.T) {
t.Fatalf("the key must exist: %d", pinned)
}

time.Sleep(3 * time.Second)
time.Sleep(4 * time.Second)

if c.Has(pinned) {
t.Fatalf("the key must not exist: %d", pinned)
Expand Down
2 changes: 1 addition & 1 deletion internal/eviction/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func NewDisabled[K comparable, V any]() Disabled[K, V] {
return Disabled[K, V]{}
}

func (d Disabled[K, V]) Read(nodes []node.Node[K, V]) {
func (d Disabled[K, V]) Read(nodes node.Node[K, V]) {
panic("not implemented")
}

Expand Down
6 changes: 2 additions & 4 deletions internal/eviction/s3fifo/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ func NewPolicy[K comparable, V any](maxWeight uint64, evictNode func(node.Node[K
}

// Read updates the eviction policy based on node accesses.
func (p *Policy[K, V]) Read(nodes []node.Node[K, V]) {
for _, n := range nodes {
n.IncrementFrequency()
}
func (p *Policy[K, V]) Read(n node.Node[K, V]) {
n.IncrementFrequency()
}

// Add adds node to the eviction policy.
Expand Down
13 changes: 10 additions & 3 deletions internal/eviction/s3fifo/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func newNode(k int) node.Node[int, int] {
return n
}

func read[K comparable, V any](p *Policy[K, V], nodes []node.Node[K, V]) {
for _, n := range nodes {
p.Read(n)
}
}

func TestPolicy_ReadAndWrite(t *testing.T) {
n := newNode(2)
p := NewPolicy[int, int](10, func(n node.Node[int, int]) {
Expand Down Expand Up @@ -58,9 +64,9 @@ func TestPolicy_OneHitWonders(t *testing.T) {
p.Add(n, 1)
}

p.Read(oneHitWonders)
read(p, oneHitWonders)
for i := 0; i < 3; i++ {
p.Read(popular)
read(p, popular)
}

newNodes := make([]node.Node[int, int], 0, 11)
Expand Down Expand Up @@ -117,7 +123,8 @@ func TestPolicy_Update(t *testing.T) {
p.Delete(n)
p.Add(n1, 1)

p.Read([]node.Node[int, int]{n1, n1})
p.Read(n1)
p.Read(n1)

n2 := m.Create(2, 1, 0, 92)
collect = true
Expand Down
143 changes: 0 additions & 143 deletions internal/lossy/buffer.go

This file was deleted.

Loading

0 comments on commit 9bbe893

Please sign in to comment.