From 9644e410e6677273b09ed504192a1f28e0c06ca5 Mon Sep 17 00:00:00 2001 From: maypok86 Date: Sat, 9 Mar 2024 00:04:23 +0300 Subject: [PATCH] [#66] Add growable write buffer --- internal/core/cache.go | 24 ++-- internal/queue/growable.go | 123 ++++++++++++++++++ internal/queue/growable_test.go | 201 +++++++++++++++++++++++++++++ internal/queue/mpsc.go | 144 --------------------- internal/queue/mpsc_test.go | 141 -------------------- internal/queue/queue_bench_test.go | 88 +++++++++++++ 6 files changed, 426 insertions(+), 295 deletions(-) create mode 100644 internal/queue/growable.go create mode 100644 internal/queue/growable_test.go delete mode 100644 internal/queue/mpsc.go delete mode 100644 internal/queue/mpsc_test.go create mode 100644 internal/queue/queue_bench_test.go diff --git a/internal/core/cache.go b/internal/core/cache.go index e460fe8..440bbcf 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -30,6 +30,10 @@ import ( "github.com/maypok86/otter/internal/xruntime" ) +const ( + minWriteBufferCapacity uint32 = 4 +) + func zeroValue[V any]() V { var zero V return zero @@ -67,7 +71,7 @@ type Cache[K comparable, V any] struct { expirePolicy expirePolicy[K, V] stats *stats.Stats readBuffers []*lossy.Buffer[K, V] - writeBuffer *queue.MPSC[task[K, V]] + writeBuffer *queue.Growable[task[K, V]] evictionMutex sync.Mutex closeOnce sync.Once doneClear chan struct{} @@ -83,7 +87,7 @@ type Cache[K comparable, V any] struct { func NewCache[K comparable, V any](c Config[K, V]) *Cache[K, V] { parallelism := xruntime.Parallelism() roundedParallelism := int(xmath.RoundUpPowerOf2(parallelism)) - writeBufferCapacity := 128 * roundedParallelism + maxWriteBufferCapacity := uint32(128 * roundedParallelism) readBuffersCount := 4 * roundedParallelism nodeManager := node.NewManager[K, V](node.Config{ @@ -119,7 +123,7 @@ func NewCache[K comparable, V any](c Config[K, V]) *Cache[K, V] { policy: s3fifo.NewPolicy[K, V](uint32(c.Capacity)), expirePolicy: expPolicy, readBuffers: readBuffers, - writeBuffer: queue.NewMPSC[task[K, V]](writeBufferCapacity), + writeBuffer: queue.NewGrowable[task[K, V]](minWriteBufferCapacity, maxWriteBufferCapacity), doneClear: make(chan struct{}), mask: uint32(readBuffersCount - 1), costFunc: c.CostFunc, @@ -164,7 +168,7 @@ func (c *Cache[K, V]) Get(key K) (V, bool) { } if got.IsExpired() { - c.writeBuffer.Insert(newDeleteTask(got)) + c.writeBuffer.Push(newDeleteTask(got)) c.stats.IncMisses() return zeroValue[V](), false } @@ -240,7 +244,7 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool) res := c.hashmap.SetIfAbsent(n) if res == nil { // insert - c.writeBuffer.Insert(newAddTask(n)) + c.writeBuffer.Push(newAddTask(n)) return true } c.stats.IncRejectedSets() @@ -251,10 +255,10 @@ func (c *Cache[K, V]) set(key K, value V, expiration uint32, onlyIfAbsent bool) if evicted != nil { // update evicted.Die() - c.writeBuffer.Insert(newUpdateTask(n, evicted)) + c.writeBuffer.Push(newUpdateTask(n, evicted)) } else { // insert - c.writeBuffer.Insert(newAddTask(n)) + c.writeBuffer.Push(newAddTask(n)) } return true @@ -272,7 +276,7 @@ func (c *Cache[K, V]) deleteNode(n node.Node[K, V]) { func (c *Cache[K, V]) afterDelete(deleted node.Node[K, V]) { if deleted != nil { deleted.Die() - c.writeBuffer.Insert(newDeleteTask(deleted)) + c.writeBuffer.Push(newDeleteTask(deleted)) } } @@ -327,7 +331,7 @@ func (c *Cache[K, V]) process() { deleted := make([]node.Node[K, V], 0, bufferCapacity) i := 0 for { - t := c.writeBuffer.Remove() + t := c.writeBuffer.Pop() if t.isClear() || t.isClose() { buffer = clearBuffer(buffer) @@ -425,7 +429,7 @@ func (c *Cache[K, V]) clear(t task[K, V]) { c.readBuffers[i].Clear() } - c.writeBuffer.Insert(t) + c.writeBuffer.Push(t) <-c.doneClear c.stats.Clear() diff --git a/internal/queue/growable.go b/internal/queue/growable.go new file mode 100644 index 0000000..a3a5b03 --- /dev/null +++ b/internal/queue/growable.go @@ -0,0 +1,123 @@ +// Copyright (c) 2024 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "sync" + + "github.com/maypok86/otter/internal/xmath" +) + +type Growable[T any] struct { + mutex sync.Mutex + notEmpty sync.Cond + notFull sync.Cond + buf []T + head int + tail int + count int + minCap int + maxCap int +} + +func NewGrowable[T any](minCap, maxCap uint32) *Growable[T] { + minCap = xmath.RoundUpPowerOf2(minCap) + maxCap = xmath.RoundUpPowerOf2(maxCap) + + g := &Growable[T]{ + buf: make([]T, minCap), + minCap: int(minCap), + maxCap: int(maxCap), + } + + g.notEmpty = *sync.NewCond(&g.mutex) + g.notFull = *sync.NewCond(&g.mutex) + + return g +} + +func (g *Growable[T]) Push(item T) { + g.mutex.Lock() + for g.count == g.maxCap { + g.notFull.Wait() + } + g.push(item) + g.mutex.Unlock() +} + +func (g *Growable[T]) push(item T) { + g.grow() + g.buf[g.tail] = item + g.tail = g.next(g.tail) + g.count++ + g.notEmpty.Signal() +} + +func (g *Growable[T]) Pop() T { + g.mutex.Lock() + for g.count == 0 { + g.notEmpty.Wait() + } + item := g.pop() + g.mutex.Unlock() + return item +} + +func (g *Growable[T]) pop() T { + var zero T + + item := g.buf[g.head] + g.buf[g.head] = zero + + g.head = g.next(g.head) + g.count-- + + g.notFull.Signal() + + return item +} + +func (g *Growable[T]) Clear() { + g.mutex.Lock() + for g.count > 0 { + g.pop() + } + g.mutex.Unlock() +} + +func (g *Growable[T]) grow() { + if g.count != len(g.buf) { + return + } + g.resize() +} + +func (g *Growable[T]) resize() { + newBuf := make([]T, g.count<<1) + if g.tail > g.head { + copy(newBuf, g.buf[g.head:g.tail]) + } else { + n := copy(newBuf, g.buf[g.head:]) + copy(newBuf[n:], g.buf[:g.tail]) + } + + g.head = 0 + g.tail = g.count + g.buf = newBuf +} + +func (g *Growable[T]) next(i int) int { + return (i + 1) & (len(g.buf) - 1) +} diff --git a/internal/queue/growable_test.go b/internal/queue/growable_test.go new file mode 100644 index 0000000..04e1304 --- /dev/null +++ b/internal/queue/growable_test.go @@ -0,0 +1,201 @@ +// Copyright (c) 2024 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "fmt" + "runtime" + "sync" + "sync/atomic" + "testing" + "time" +) + +const minCapacity = 4 + +func TestGrowable_PushPop(t *testing.T) { + const capacity = 10 + g := NewGrowable[int](minCapacity, capacity) + for i := 0; i < capacity; i++ { + g.Push(i) + } + for i := 0; i < capacity; i++ { + if got := g.Pop(); got != i { + t.Fatalf("got %v, want %d", got, i) + } + } +} + +func TestGrowable_ClearAndPopBlocksOnEmpty(t *testing.T) { + const capacity = 10 + g := NewGrowable[int](minCapacity, capacity) + for i := 0; i < capacity; i++ { + g.Push(i) + } + + g.Clear() + + cdone := make(chan bool) + flag := int32(0) + go func() { + g.Pop() + if atomic.LoadInt32(&flag) == 0 { + t.Error("pop on empty queue didn't wait for pop") + } + cdone <- true + }() + time.Sleep(50 * time.Millisecond) + atomic.StoreInt32(&flag, 1) + g.Push(-1) + <-cdone +} + +func TestGrowable_PushBlocksOnFull(t *testing.T) { + g := NewGrowable[string](1, 1) + g.Push("foo") + + done := make(chan struct{}) + flag := int32(0) + go func() { + g.Push("bar") + if atomic.LoadInt32(&flag) == 0 { + t.Error("push on full queue didn't wait for pop") + } + done <- struct{}{} + }() + + time.Sleep(50 * time.Millisecond) + atomic.StoreInt32(&flag, 1) + if got := g.Pop(); got != "foo" { + t.Fatalf("got %v, want foo", got) + } + <-done +} + +func TestGrowable_PopBlocksOnEmpty(t *testing.T) { + g := NewGrowable[string](2, 2) + + done := make(chan struct{}) + flag := int32(0) + go func() { + g.Pop() + if atomic.LoadInt32(&flag) == 0 { + t.Error("pop on empty queue didn't wait for push") + } + done <- struct{}{} + }() + + time.Sleep(50 * time.Millisecond) + atomic.StoreInt32(&flag, 1) + g.Push("foobar") + <-done +} + +func testGrowableConcurrent(t *testing.T, parallelism, ops, goroutines int) { + t.Helper() + runtime.GOMAXPROCS(parallelism) + + g := NewGrowable[int](minCapacity, uint32(goroutines)) + var wg sync.WaitGroup + wg.Add(1) + csum := make(chan int, goroutines) + + // run producers. + for i := 0; i < goroutines; i++ { + go func(n int) { + wg.Wait() + for j := n; j < ops; j += goroutines { + g.Push(j) + } + }(i) + } + + // run consumers. + for i := 0; i < goroutines; i++ { + go func(n int) { + wg.Wait() + sum := 0 + for j := n; j < ops; j += goroutines { + item := g.Pop() + sum += item + } + csum <- sum + }(i) + } + wg.Done() + // Wait for all the sums from producers. + sum := 0 + for i := 0; i < goroutines; i++ { + s := <-csum + sum += s + } + + expectedSum := ops * (ops - 1) / 2 + if sum != expectedSum { + t.Errorf("calculated sum is wrong. got %d, want %d", sum, expectedSum) + } +} + +func TestGrowable_Concurrent(t *testing.T) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) + + n := 100 + if testing.Short() { + n = 10 + } + + tests := []struct { + parallelism int + ops int + goroutines int + }{ + { + parallelism: 1, + ops: 100 * n, + goroutines: n, + }, + { + parallelism: 1, + ops: 1000 * n, + goroutines: 10 * n, + }, + { + parallelism: 4, + ops: 100 * n, + goroutines: n, + }, + { + parallelism: 4, + ops: 1000 * n, + goroutines: 10 * n, + }, + { + parallelism: 8, + ops: 100 * n, + goroutines: n, + }, + { + parallelism: 8, + ops: 1000 * n, + goroutines: 10 * n, + }, + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("testConcurrent-%d-%d", tt.parallelism, tt.ops), func(t *testing.T) { + testGrowableConcurrent(t, tt.parallelism, tt.ops, tt.goroutines) + }) + } +} diff --git a/internal/queue/mpsc.go b/internal/queue/mpsc.go deleted file mode 100644 index 3f36316..0000000 --- a/internal/queue/mpsc.go +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright (c) 2023 Alexey Mayshev. All rights reserved. -// Copyright (c) 2023 Andrey Pechkurov -// -// Copyright notice. This code is a fork of xsync.MPMCQueueOf from this file with many changes: -// https://github.com/puzpuzpuz/xsync/blob/main/mpmcqueueof.go -// -// Use of this source code is governed by a MIT license that can be found -// at https://github.com/puzpuzpuz/xsync/blob/main/LICENSE - -package queue - -import ( - "runtime" - "sync/atomic" - "unsafe" - - "github.com/maypok86/otter/internal/xruntime" -) - -const ( - maxRetries = 16 -) - -func zeroValue[T any]() T { - var zero T - return zero -} - -// A MPSC is a bounded multi-producer single-consumer concurrent queue. -// -// MPSC instances must be created with NewMPSC function. -// A MPSC must not be copied after first use. -// -// Based on the data structure from the following C++ library: -// https://github.com/rigtorp/MPMCQueue -type MPSC[T any] struct { - capacity uint64 - sleep chan struct{} - head atomic.Uint64 - headPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte - tail uint64 - tailPadding [xruntime.CacheLineSize - 8]byte - isSleep atomic.Uint64 - sleepPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte - slots []slot[T] -} - -type slot[T any] struct { - // atomic.Uint64 is used here to get proper 8 byte alignment on 32-bit archs. - turn atomic.Uint64 - item T -} - -// NewMPSC creates a new MPSC instance with the given capacity. -func NewMPSC[T any](capacity int) *MPSC[T] { - return &MPSC[T]{ - sleep: make(chan struct{}), - capacity: uint64(capacity), - slots: make([]slot[T], capacity), - } -} - -// Insert inserts the given item into the queue. -// Blocks, if the queue is full. -func (q *MPSC[T]) Insert(item T) { - head := q.head.Add(1) - 1 - q.wakeUpConsumer() - - slot := &q.slots[q.idx(head)] - turn := q.turn(head) * 2 - retries := 0 - for slot.turn.Load() != turn { - if retries == maxRetries { - q.wakeUpConsumer() - retries = 0 - continue - } - retries++ - runtime.Gosched() - } - - slot.item = item - slot.turn.Store(turn + 1) -} - -// Remove retrieves and removes the item from the head of the queue. -// Blocks, if the queue is empty. -func (q *MPSC[T]) Remove() T { - tail := q.tail - slot := &q.slots[q.idx(tail)] - turn := 2*q.turn(tail) + 1 - retries := 0 - for slot.turn.Load() != turn { - if retries == maxRetries { - q.sleepConsumer() - retries = 0 - continue - } - retries++ - runtime.Gosched() - } - item := slot.item - slot.item = zeroValue[T]() - slot.turn.Store(turn + 1) - q.tail++ - return item -} - -// Clear clears the queue. -func (q *MPSC[T]) Clear() { - for !q.isEmpty() { - _ = q.Remove() - } -} - -// Capacity returns capacity of the queue. -func (q *MPSC[T]) Capacity() int { - return int(q.capacity) -} - -func (q *MPSC[T]) wakeUpConsumer() { - if q.isSleep.Load() == 1 && q.isSleep.CompareAndSwap(1, 0) { - // if the consumer is asleep, we'll wake him up. - q.sleep <- struct{}{} - } -} - -func (q *MPSC[T]) sleepConsumer() { - // if the queue's been empty for too long, we fall asleep. - q.isSleep.Store(1) - <-q.sleep -} - -func (q *MPSC[T]) isEmpty() bool { - return q.tail == q.head.Load() -} - -func (q *MPSC[T]) idx(i uint64) uint64 { - return i % q.capacity -} - -func (q *MPSC[T]) turn(i uint64) uint64 { - return i / q.capacity -} diff --git a/internal/queue/mpsc_test.go b/internal/queue/mpsc_test.go deleted file mode 100644 index 86599f7..0000000 --- a/internal/queue/mpsc_test.go +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright (c) 2023 Alexey Mayshev. All rights reserved. -// Copyright (c) 2023 Andrey Pechkurov -// -// Copyright notice. This code is a fork of tests for xsync.MPMCQueueOf from this file with some changes: -// https://github.com/puzpuzpuz/xsync/blob/main/mpmcqueueof_test.go -// -// Use of this source code is governed by a MIT license that can be found -// at https://github.com/puzpuzpuz/xsync/blob/main/LICENSE - -package queue - -import ( - "fmt" - "runtime" - "sync/atomic" - "testing" - "time" -) - -func TestMPSC_Capacity(t *testing.T) { - const capacity = 10 - q := NewMPSC[int](capacity) - gotCapacity := q.Capacity() - if capacity != gotCapacity { - t.Fatalf("got %d, want %d", gotCapacity, capacity) - } -} - -func TestMPSC_InsertRemove(t *testing.T) { - const capacity = 10 - q := NewMPSC[int](capacity) - for i := 0; i < capacity; i++ { - q.Insert(i) - } - for i := 0; i < capacity; i++ { - if got := q.Remove(); got != i { - t.Fatalf("got %v, want %d", got, i) - } - } -} - -func TestMPSC_InsertBlocksOnFull(t *testing.T) { - q := NewMPSC[string](1) - q.Insert("foo") - - done := make(chan struct{}) - flag := int32(0) - go func() { - q.Insert("bar") - if atomic.LoadInt32(&flag) == 0 { - t.Error("insert on full queue didn't wait for remove") - } - done <- struct{}{} - }() - - time.Sleep(50 * time.Millisecond) - atomic.StoreInt32(&flag, 1) - if got := q.Remove(); got != "foo" { - t.Fatalf("got %v, want foo", got) - } - <-done -} - -func TestMPSC_RemoveBlocksOnEmpty(t *testing.T) { - q := NewMPSC[string](2) - - done := make(chan struct{}) - flag := int32(0) - go func() { - q.Remove() - if atomic.LoadInt32(&flag) == 0 { - t.Error("remove on empty queue didn't wait for insert") - } - done <- struct{}{} - }() - - time.Sleep(50 * time.Millisecond) - atomic.StoreInt32(&flag, 1) - q.Insert("foobar") - <-done -} - -func testMPSCConcurrent(t *testing.T, parallelism, ops, goroutines int) { - t.Helper() - runtime.GOMAXPROCS(parallelism) - - q := NewMPSC[int](goroutines) - - // run producers. - for i := 0; i < goroutines; i++ { - go func(n int) { - for j := n; j < ops; j += goroutines { - q.Insert(j) - } - }(i) - } - - // run consumer. - sum := 0 - for j := 0; j < ops; j++ { - item := q.Remove() - sum += item - } - - expectedSum := ops * (ops - 1) / 2 - if sum != expectedSum { - t.Errorf("calculated sum is wrong. got %d, want %d", sum, expectedSum) - } -} - -func TestMPSC_Concurrent(t *testing.T) { - defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) - - tests := []struct { - parallelism int - ops int - goroutines int - }{ - { - parallelism: 1, - ops: 10, - goroutines: 10, - }, - { - parallelism: 2, - ops: 100, - goroutines: 20, - }, - { - parallelism: 4, - ops: 1000, - goroutines: 40, - }, - } - - for _, tt := range tests { - t.Run(fmt.Sprintf("testConcurrent-%d", tt.parallelism), func(t *testing.T) { - testMPSCConcurrent(t, tt.parallelism, tt.ops, tt.goroutines) - }) - } -} diff --git a/internal/queue/queue_bench_test.go b/internal/queue/queue_bench_test.go new file mode 100644 index 0000000..f794f16 --- /dev/null +++ b/internal/queue/queue_bench_test.go @@ -0,0 +1,88 @@ +// Copyright (c) 2024 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package queue + +import ( + "runtime" + "sync/atomic" + "testing" +) + +func benchmarkProdCons(b *testing.B, push func(int), pop func() int, queueSize, localWork int) { + b.Helper() + + callsPerSched := queueSize + procs := runtime.GOMAXPROCS(-1) / 2 + if procs == 0 { + procs = 1 + } + N := int32(b.N / callsPerSched) + c := make(chan bool, 2*procs) + for p := 0; p < procs; p++ { + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + for g := 0; g < callsPerSched; g++ { + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + push(1) + } + } + push(0) + c <- foo == 42 + }() + go func() { + foo := 0 + for { + v := pop() + if v == 0 { + break + } + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + } + c <- foo == 42 + }() + } + for p := 0; p < procs; p++ { + <-c + <-c + } +} + +func BenchmarkGrowableProdConsWork100(b *testing.B) { + length := 128 * 16 + g := NewGrowable[int](uint32(length), uint32(length)) + b.ResetTimer() + benchmarkProdCons(b, func(i int) { + g.Push(i) + }, func() int { + return g.Pop() + }, length, 100) +} + +func BenchmarkChanProdConsWork100(b *testing.B) { + length := 128 * 16 + c := make(chan int, length) + benchmarkProdCons(b, func(i int) { + c <- i + }, func() int { + return <-c + }, length, 100) +}