Skip to content

Commit

Permalink
feat: make lossy buffers dynamically striped
Browse files Browse the repository at this point in the history
  • Loading branch information
maypok86 committed Sep 15, 2024
1 parent 7929499 commit 7133b0c
Show file tree
Hide file tree
Showing 18 changed files with 721 additions and 186 deletions.
1 change: 1 addition & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ jobs:
uses: golangci/golangci-lint-action@v6
with:
version: latest
args: --timeout=4m -v --out-${NO_FUTURE}format colored-line-number
6 changes: 4 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ run:
modules-download-mode: readonly
go: '1.17'
output:
format: tab:lint.txt
formats:
- format: tab
path: lint.txt
print-issued-lines: false
uniq-by-line: false
sort-results: true
Expand All @@ -17,7 +19,7 @@ linters:
- bidichk
- bodyclose
- contextcheck
- copyloopvar
#- copyloopvar
- durationcheck
- errcheck
- errname
Expand Down
1 change: 1 addition & 0 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (b *Builder[K, V]) Logger(logger Logger) *Builder[K, V] {

func (b *Builder[K, V]) getMaximum() *uint64 {
if b.maximumSize != nil {
//nolint:gosec // there is no overflow
ms := uint64(*b.maximumSize)
return &ms
}
Expand Down
36 changes: 10 additions & 26 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func init() {
}

type evictionPolicy[K comparable, V any] interface {
Read(nodes []node.Node[K, V])
Read(n node.Node[K, V])
Add(n node.Node[K, V], nowNanos int64)
Delete(n node.Node[K, V])
MaxAvailableWeight() uint64
Expand All @@ -73,15 +73,14 @@ type Cache[K comparable, V any] struct {
stats statsRecorder
logger Logger
clock *clock.Clock
stripedBuffer []*lossy.Buffer[K, V]
stripedBuffer *lossy.Striped[K, V]
writeBuffer *queue.Growable[task[K, V]]
evictionMutex sync.Mutex
closeOnce sync.Once
doneClear chan struct{}
doneClose chan struct{}
weigher func(key K, value V) uint32
onDeletion func(e DeletionEvent[K, V])
mask uint32
ttl time.Duration
withExpiration bool
withEviction bool
Expand All @@ -99,12 +98,9 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] {
maximum := b.getMaximum()
withEviction := maximum != nil

var stripedBuffer []*lossy.Buffer[K, V]
var stripedBuffer *lossy.Striped[K, V]
if withEviction {
stripedBuffer = make([]*lossy.Buffer[K, V], 0, maxStripedBufferSize)
for i := 0; i < maxStripedBufferSize; i++ {
stripedBuffer = append(stripedBuffer, lossy.New[K, V](nodeManager))
}
stripedBuffer = lossy.NewStriped(maxStripedBufferSize, nodeManager)
}

var hashmap *hashtable.Map[K, V]
Expand All @@ -122,10 +118,8 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] {
stripedBuffer: stripedBuffer,
doneClear: make(chan struct{}),
doneClose: make(chan struct{}, 1),
//nolint:gosec // there will never be an overflow
mask: uint32(maxStripedBufferSize - 1),
weigher: b.getWeigher(),
onDeletion: b.onDeletion,
weigher: b.getWeigher(),
onDeletion: b.onDeletion,
}

cache.withEviction = withEviction
Expand Down Expand Up @@ -166,10 +160,6 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] {
return cache
}

func (c *Cache[K, V]) getReadBufferIdx() int {
return int(xruntime.Fastrand() & c.mask)
}

func (c *Cache[K, V]) getExpiration(duration time.Duration) int64 {
return c.clock.Offset() + duration.Nanoseconds()
}
Expand Down Expand Up @@ -234,14 +224,10 @@ func (c *Cache[K, V]) afterGet(got node.Node[K, V]) {
return
}

idx := c.getReadBufferIdx()
pb := c.stripedBuffer[idx].Add(got)
if pb != nil {
c.evictionMutex.Lock()
c.policy.Read(pb.Returned)
result := c.stripedBuffer.Add(got)
if result == lossy.Full && c.evictionMutex.TryLock() {
c.stripedBuffer.DrainTo(c.policy.Read)
c.evictionMutex.Unlock()

c.stripedBuffer[idx].Free()
}
}

Expand Down Expand Up @@ -521,9 +507,7 @@ func (c *Cache[K, V]) clear(t task[K, V]) {
}

if c.withEviction {
for i := 0; i < len(c.stripedBuffer); i++ {
c.stripedBuffer[i].Clear()
}
c.stripedBuffer.Clear()
}

c.writeBuffer.Push(t)
Expand Down
7 changes: 2 additions & 5 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestCache_PinnedWeight(t *testing.T) {
}
return 1
}).
WithTTL(3 * time.Second).
WithTTL(2 * time.Second).
OnDeletion(func(e DeletionEvent[int, int]) {
mutex.Lock()
m[e.Cause]++
Expand All @@ -127,9 +127,6 @@ func TestCache_PinnedWeight(t *testing.T) {
}
for i := size; i < 2*size; i++ {
c.Set(i, i)
}
time.Sleep(time.Second)
for i := size; i < 2*size; i++ {
if !c.Has(i) {
t.Fatalf("the key must exist: %d", i)
}
Expand All @@ -139,7 +136,7 @@ func TestCache_PinnedWeight(t *testing.T) {
t.Fatalf("the key must exist: %d", pinned)
}

time.Sleep(3 * time.Second)
time.Sleep(4 * time.Second)

if c.Has(pinned) {
t.Fatalf("the key must not exist: %d", pinned)
Expand Down
2 changes: 1 addition & 1 deletion internal/eviction/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func NewDisabled[K comparable, V any]() Disabled[K, V] {
return Disabled[K, V]{}
}

func (d Disabled[K, V]) Read(nodes []node.Node[K, V]) {
func (d Disabled[K, V]) Read(nodes node.Node[K, V]) {
panic("not implemented")
}

Expand Down
6 changes: 2 additions & 4 deletions internal/eviction/s3fifo/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,8 @@ func NewPolicy[K comparable, V any](maxWeight uint64, evictNode func(node.Node[K
}

// Read updates the eviction policy based on node accesses.
func (p *Policy[K, V]) Read(nodes []node.Node[K, V]) {
for _, n := range nodes {
n.IncrementFrequency()
}
func (p *Policy[K, V]) Read(n node.Node[K, V]) {
n.IncrementFrequency()
}

// Add adds node to the eviction policy.
Expand Down
13 changes: 10 additions & 3 deletions internal/eviction/s3fifo/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ func newNode(k int) node.Node[int, int] {
return n
}

func read[K comparable, V any](p *Policy[K, V], nodes []node.Node[K, V]) {
for _, n := range nodes {
p.Read(n)
}
}

func TestPolicy_ReadAndWrite(t *testing.T) {
n := newNode(2)
p := NewPolicy[int, int](10, func(n node.Node[int, int]) {
Expand Down Expand Up @@ -58,9 +64,9 @@ func TestPolicy_OneHitWonders(t *testing.T) {
p.Add(n, 1)
}

p.Read(oneHitWonders)
read(p, oneHitWonders)
for i := 0; i < 3; i++ {
p.Read(popular)
read(p, popular)
}

newNodes := make([]node.Node[int, int], 0, 11)
Expand Down Expand Up @@ -117,7 +123,8 @@ func TestPolicy_Update(t *testing.T) {
p.Delete(n)
p.Add(n1, 1)

p.Read([]node.Node[int, int]{n1, n1})
p.Read(n1)
p.Read(n1)

n2 := m.Create(2, 1, 0, 92)
collect = true
Expand Down
3 changes: 3 additions & 0 deletions internal/expiry/variable.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (v *Variable[K, V]) findBucket(expiration uint64) node.Node[K, V] {

// Add schedules a timer event for the node.
func (v *Variable[K, V]) Add(n node.Node[K, V]) {
//nolint:gosec // there is no overflow
root := v.findBucket(uint64(n.Expiration()))
link(root, n)
}
Expand All @@ -94,6 +95,7 @@ func (v *Variable[K, V]) Delete(n node.Node[K, V]) {
}

func (v *Variable[K, V]) DeleteExpired(nowNanos int64) {
//nolint:gosec // there is no overflow
currentTime := uint64(nowNanos)
prevTime := v.time
v.time = currentTime
Expand Down Expand Up @@ -130,6 +132,7 @@ func (v *Variable[K, V]) deleteExpiredFromBucket(index int, prevTicks, delta uin
n.SetPrevExp(nil)
n.SetNextExp(nil)

//nolint:gosec // there is no overflow
if uint64(n.Expiration()) <= v.time {
v.deleteNode(n)
} else {
Expand Down
4 changes: 4 additions & 0 deletions internal/hashtable/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ type table[K comparable] struct {
}

func (t *table[K]) addSize(bucketIdx uint64, delta int) {
//nolint:gosec // there is no overflow
counterIdx := uint64(len(t.size)-1) & bucketIdx
atomic.AddInt64(&t.size[counterIdx].c, int64(delta))
}

func (t *table[K]) addSizePlain(bucketIdx uint64, delta int) {
//nolint:gosec // there is no overflow
counterIdx := uint64(len(t.size)-1) & bucketIdx
t.size[counterIdx].c += int64(delta)
}
Expand Down Expand Up @@ -159,6 +161,7 @@ func newTable[K comparable](bucketCount int, prevHasher maphash.Hasher[K]) *tabl
counterLength = maxCounterLength
}
counter := make([]paddedCounter, counterLength)
//nolint:gosec // there is no overflow
mask := uint64(len(buckets) - 1)
t := &table[K]{
buckets: buckets,
Expand Down Expand Up @@ -435,6 +438,7 @@ func (m *Map[K, V]) resize(known *table[K], hint resizeHint) {
if hint != clearHint {
for i := 0; i < tableLen; i++ {
copied := m.copyBuckets(&t.buckets[i], nt)
//nolint:gosec // there is no overflow
nt.addSizePlain(uint64(i), copied)
}
}
Expand Down
Loading

0 comments on commit 7133b0c

Please sign in to comment.