Skip to content

Commit

Permalink
[#66] Add growable write buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
maypok86 committed Mar 9, 2024
1 parent f2b403b commit 89a77b0
Show file tree
Hide file tree
Showing 6 changed files with 426 additions and 295 deletions.
24 changes: 14 additions & 10 deletions internal/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ const (
Expired
)

const (
minWriteBufferCapacity uint32 = 4
)

func zeroValue[V any]() V {
var zero V
return zero
Expand Down Expand Up @@ -82,7 +86,7 @@ type Cache[K comparable, V any] struct {
expirePolicy expirePolicy[K, V]
stats *stats.Stats
readBuffers []*lossy.Buffer[K, V]
writeBuffer *queue.MPSC[task[K, V]]
writeBuffer *queue.Growable[task[K, V]]
evictionMutex sync.Mutex
closeOnce sync.Once
doneClear chan struct{}
Expand All @@ -99,7 +103,7 @@ type Cache[K comparable, V any] struct {
func NewCache[K comparable, V any](c Config[K, V]) *Cache[K, V] {
parallelism := xruntime.Parallelism()
roundedParallelism := int(xmath.RoundUpPowerOf2(parallelism))
writeBufferCapacity := 128 * roundedParallelism
maxWriteBufferCapacity := uint32(128 * roundedParallelism)
readBuffersCount := 4 * roundedParallelism

nodeManager := node.NewManager[K, V](node.Config{
Expand Down Expand Up @@ -135,7 +139,7 @@ func NewCache[K comparable, V any](c Config[K, V]) *Cache[K, V] {
policy: s3fifo.NewPolicy[K, V](uint32(c.Capacity)),
expirePolicy: expPolicy,
readBuffers: readBuffers,
writeBuffer: queue.NewMPSC[task[K, V]](writeBufferCapacity),
writeBuffer: queue.NewGrowable[task[K, V]](minWriteBufferCapacity, maxWriteBufferCapacity),
doneClear: make(chan struct{}),
mask: uint32(readBuffersCount - 1),
costFunc: c.CostFunc,
Expand Down Expand Up @@ -181,7 +185,7 @@ func (c *Cache[K, V]) Get(key K) (V, bool) {
}

if got.IsExpired() {
c.writeBuffer.Insert(newDeleteTask(got))
c.writeBuffer.Push(newDeleteTask(got))
c.stats.IncMisses()
return zeroValue[V](), false
}
Expand Down Expand Up @@ -257,7 +261,7 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool)
res := c.hashmap.SetIfAbsent(n)
if res == nil {
// insert
c.writeBuffer.Insert(newAddTask(n))
c.writeBuffer.Push(newAddTask(n))
return true
}
c.stats.IncRejectedSets()
Expand All @@ -268,10 +272,10 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool)
if evicted != nil {
// update
evicted.Die()
c.writeBuffer.Insert(newUpdateTask(n, evicted))
c.writeBuffer.Push(newUpdateTask(n, evicted))
} else {
// insert
c.writeBuffer.Insert(newAddTask(n))
c.writeBuffer.Push(newAddTask(n))
}

return true
Expand All @@ -289,7 +293,7 @@ func (c *Cache[K, V]) deleteNode(n node.Node[K, V]) {
func (c *Cache[K, V]) afterDelete(deleted node.Node[K, V]) {
if deleted != nil {
deleted.Die()
c.writeBuffer.Insert(newDeleteTask(deleted))
c.writeBuffer.Push(newDeleteTask(deleted))
}
}

Expand Down Expand Up @@ -353,7 +357,7 @@ func (c *Cache[K, V]) process() {
deleted := make([]node.Node[K, V], 0, bufferCapacity)
i := 0
for {
t := c.writeBuffer.Remove()
t := c.writeBuffer.Pop()

if t.isClear() || t.isClose() {
buffer = clearBuffer(buffer)
Expand Down Expand Up @@ -463,7 +467,7 @@ func (c *Cache[K, V]) clear(t task[K, V]) {
c.readBuffers[i].Clear()
}

c.writeBuffer.Insert(t)
c.writeBuffer.Push(t)
<-c.doneClear

c.stats.Clear()
Expand Down
123 changes: 123 additions & 0 deletions internal/queue/growable.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (c) 2024 Alexey Mayshev. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package queue

import (
"sync"

"github.com/maypok86/otter/internal/xmath"
)

type Growable[T any] struct {
mutex sync.Mutex
notEmpty sync.Cond
notFull sync.Cond
buf []T
head int
tail int
count int
minCap int
maxCap int
}

func NewGrowable[T any](minCap, maxCap uint32) *Growable[T] {
minCap = xmath.RoundUpPowerOf2(minCap)
maxCap = xmath.RoundUpPowerOf2(maxCap)

g := &Growable[T]{
buf: make([]T, minCap),
minCap: int(minCap),
maxCap: int(maxCap),
}

g.notEmpty = *sync.NewCond(&g.mutex)
g.notFull = *sync.NewCond(&g.mutex)

return g
}

func (g *Growable[T]) Push(item T) {
g.mutex.Lock()
for g.count == g.maxCap {
g.notFull.Wait()
}
g.push(item)
g.mutex.Unlock()
}

func (g *Growable[T]) push(item T) {
g.grow()
g.buf[g.tail] = item
g.tail = g.next(g.tail)
g.count++
g.notEmpty.Signal()
}

func (g *Growable[T]) Pop() T {
g.mutex.Lock()
for g.count == 0 {
g.notEmpty.Wait()
}
item := g.pop()
g.mutex.Unlock()
return item
}

func (g *Growable[T]) pop() T {
var zero T

item := g.buf[g.head]
g.buf[g.head] = zero

g.head = g.next(g.head)
g.count--

g.notFull.Signal()

return item
}

func (g *Growable[T]) Clear() {
g.mutex.Lock()
for g.count > 0 {
g.pop()
}
g.mutex.Unlock()
}

func (g *Growable[T]) grow() {
if g.count != len(g.buf) {
return
}
g.resize()
}

func (g *Growable[T]) resize() {
newBuf := make([]T, g.count<<1)
if g.tail > g.head {
copy(newBuf, g.buf[g.head:g.tail])
} else {
n := copy(newBuf, g.buf[g.head:])
copy(newBuf[n:], g.buf[:g.tail])
}

g.head = 0
g.tail = g.count
g.buf = newBuf
}

func (g *Growable[T]) next(i int) int {
return (i + 1) & (len(g.buf) - 1)
}
Loading

0 comments on commit 89a77b0

Please sign in to comment.