diff --git a/.golangci.yml b/.golangci.yml index a90195f..ed7938b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -4,9 +4,11 @@ run: build-tags: - integration modules-download-mode: readonly - go: '1.17' + go: '1.19' output: - format: tab:lint.txt + formats: + - format: tab + path: lint.txt print-issued-lines: false uniq-by-line: false sort-results: true @@ -17,7 +19,7 @@ linters: - bidichk - bodyclose - contextcheck - - copyloopvar + #- copyloopvar - durationcheck - errcheck - errname diff --git a/cache.go b/cache.go index 3dbfbc4..6822a9f 100644 --- a/cache.go +++ b/cache.go @@ -49,7 +49,7 @@ func init() { } type evictionPolicy[K comparable, V any] interface { - Read(nodes []node.Node[K, V]) + Read(n node.Node[K, V]) Add(n node.Node[K, V], nowNanos int64) Delete(n node.Node[K, V]) MaxAvailableWeight() uint64 @@ -73,7 +73,7 @@ type Cache[K comparable, V any] struct { stats statsRecorder logger Logger clock *clock.Clock - stripedBuffer []*lossy.Buffer[K, V] + stripedBuffer *lossy.Striped[K, V] writeBuffer *queue.Growable[task[K, V]] evictionMutex sync.Mutex closeOnce sync.Once @@ -81,7 +81,6 @@ type Cache[K comparable, V any] struct { doneClose chan struct{} weigher func(key K, value V) uint32 onDeletion func(e DeletionEvent[K, V]) - mask uint32 ttl time.Duration withExpiration bool withEviction bool @@ -99,12 +98,9 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] { maximum := b.getMaximum() withEviction := maximum != nil - var stripedBuffer []*lossy.Buffer[K, V] + var stripedBuffer *lossy.Striped[K, V] if withEviction { - stripedBuffer = make([]*lossy.Buffer[K, V], 0, maxStripedBufferSize) - for i := 0; i < maxStripedBufferSize; i++ { - stripedBuffer = append(stripedBuffer, lossy.New[K, V](nodeManager)) - } + stripedBuffer = lossy.NewStriped(maxStripedBufferSize, nodeManager) } var hashmap *hashtable.Map[K, V] @@ -122,10 +118,8 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] { stripedBuffer: stripedBuffer, doneClear: make(chan struct{}), doneClose: make(chan struct{}, 1), - //nolint:gosec // there will never be an overflow - mask: uint32(maxStripedBufferSize - 1), - weigher: b.getWeigher(), - onDeletion: b.onDeletion, + weigher: b.getWeigher(), + onDeletion: b.onDeletion, } cache.withEviction = withEviction @@ -166,10 +160,6 @@ func newCache[K comparable, V any](b *Builder[K, V]) *Cache[K, V] { return cache } -func (c *Cache[K, V]) getReadBufferIdx() int { - return int(xruntime.Fastrand() & c.mask) -} - func (c *Cache[K, V]) getExpiration(duration time.Duration) int64 { return c.clock.Offset() + duration.Nanoseconds() } @@ -234,14 +224,10 @@ func (c *Cache[K, V]) afterGet(got node.Node[K, V]) { return } - idx := c.getReadBufferIdx() - pb := c.stripedBuffer[idx].Add(got) - if pb != nil { - c.evictionMutex.Lock() - c.policy.Read(pb.Returned) + result := c.stripedBuffer.Add(got) + if result == lossy.Full && c.evictionMutex.TryLock() { + c.stripedBuffer.DrainTo(c.policy.Read) c.evictionMutex.Unlock() - - c.stripedBuffer[idx].Free() } } @@ -521,9 +507,7 @@ func (c *Cache[K, V]) clear(t task[K, V]) { } if c.withEviction { - for i := 0; i < len(c.stripedBuffer); i++ { - c.stripedBuffer[i].Clear() - } + c.stripedBuffer.Clear() } c.writeBuffer.Push(t) diff --git a/cache_test.go b/cache_test.go index a045c43..e572b50 100644 --- a/cache_test.go +++ b/cache_test.go @@ -105,7 +105,7 @@ func TestCache_PinnedWeight(t *testing.T) { } return 1 }). - WithTTL(3 * time.Second). + WithTTL(2 * time.Second). OnDeletion(func(e DeletionEvent[int, int]) { mutex.Lock() m[e.Cause]++ @@ -127,9 +127,6 @@ func TestCache_PinnedWeight(t *testing.T) { } for i := size; i < 2*size; i++ { c.Set(i, i) - } - time.Sleep(time.Second) - for i := size; i < 2*size; i++ { if !c.Has(i) { t.Fatalf("the key must exist: %d", i) } @@ -139,7 +136,7 @@ func TestCache_PinnedWeight(t *testing.T) { t.Fatalf("the key must exist: %d", pinned) } - time.Sleep(3 * time.Second) + time.Sleep(4 * time.Second) if c.Has(pinned) { t.Fatalf("the key must not exist: %d", pinned) diff --git a/internal/eviction/disabled.go b/internal/eviction/disabled.go index 61fe20b..5dc5e93 100644 --- a/internal/eviction/disabled.go +++ b/internal/eviction/disabled.go @@ -12,7 +12,7 @@ func NewDisabled[K comparable, V any]() Disabled[K, V] { return Disabled[K, V]{} } -func (d Disabled[K, V]) Read(nodes []node.Node[K, V]) { +func (d Disabled[K, V]) Read(nodes node.Node[K, V]) { panic("not implemented") } diff --git a/internal/eviction/s3fifo/policy.go b/internal/eviction/s3fifo/policy.go index 1a2eea5..a4a6e47 100644 --- a/internal/eviction/s3fifo/policy.go +++ b/internal/eviction/s3fifo/policy.go @@ -48,10 +48,8 @@ func NewPolicy[K comparable, V any](maxWeight uint64, evictNode func(node.Node[K } // Read updates the eviction policy based on node accesses. -func (p *Policy[K, V]) Read(nodes []node.Node[K, V]) { - for _, n := range nodes { - n.IncrementFrequency() - } +func (p *Policy[K, V]) Read(n node.Node[K, V]) { + n.IncrementFrequency() } // Add adds node to the eviction policy. diff --git a/internal/eviction/s3fifo/policy_test.go b/internal/eviction/s3fifo/policy_test.go index c5ebf6f..62beadd 100644 --- a/internal/eviction/s3fifo/policy_test.go +++ b/internal/eviction/s3fifo/policy_test.go @@ -26,6 +26,12 @@ func newNode(k int) node.Node[int, int] { return n } +func read[K comparable, V any](p *Policy[K, V], nodes []node.Node[K, V]) { + for _, n := range nodes { + p.Read(n) + } +} + func TestPolicy_ReadAndWrite(t *testing.T) { n := newNode(2) p := NewPolicy[int, int](10, func(n node.Node[int, int]) { @@ -58,9 +64,9 @@ func TestPolicy_OneHitWonders(t *testing.T) { p.Add(n, 1) } - p.Read(oneHitWonders) + read(p, oneHitWonders) for i := 0; i < 3; i++ { - p.Read(popular) + read(p, popular) } newNodes := make([]node.Node[int, int], 0, 11) @@ -117,7 +123,8 @@ func TestPolicy_Update(t *testing.T) { p.Delete(n) p.Add(n1, 1) - p.Read([]node.Node[int, int]{n1, n1}) + p.Read(n1) + p.Read(n1) n2 := m.Create(2, 1, 0, 92) collect = true diff --git a/internal/lossy/buffer.go b/internal/lossy/buffer.go deleted file mode 100644 index 5a81df6..0000000 --- a/internal/lossy/buffer.go +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright (c) 2023 Alexey Mayshev. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package lossy - -import ( - "runtime" - "sync/atomic" - "unsafe" - - "github.com/maypok86/otter/v2/internal/generated/node" - "github.com/maypok86/otter/v2/internal/xruntime" -) - -const ( - // The maximum number of elements per buffer. - capacity = 16 - mask = uint64(capacity - 1) -) - -// PolicyBuffers is the set of buffers returned by the lossy buffer. -type PolicyBuffers[K comparable, V any] struct { - Returned []node.Node[K, V] -} - -// Buffer is a circular ring buffer stores the elements being transferred by the producers to the consumer. -// The monotonically increasing count of reads and writes allow indexing sequentially to the next -// element location based upon a power-of-two sizing. -// -// The producers race to read the counts, check if there is available capacity, and if so then try -// once to CAS to the next write count. If the increment is successful then the producer lazily -// publishes the element. The producer does not retry or block when unsuccessful due to a failed -// CAS or the buffer being full. -// -// The consumer reads the counts and takes the available elements. The clearing of the elements -// and the next read count are lazily set. -// -// This implementation is striped to further increase concurrency. -type Buffer[K comparable, V any] struct { - head atomic.Uint64 - headPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte - tail atomic.Uint64 - tailPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte - nodeManager *node.Manager[K, V] - returned unsafe.Pointer - returnedPadding [xruntime.CacheLineSize - 2*8]byte - policyBuffers unsafe.Pointer - returnedSlicePadding [xruntime.CacheLineSize - 8]byte - buffer [capacity]unsafe.Pointer -} - -// New creates a new lossy Buffer. -func New[K comparable, V any](nodeManager *node.Manager[K, V]) *Buffer[K, V] { - pb := &PolicyBuffers[K, V]{ - Returned: make([]node.Node[K, V], 0, capacity), - } - b := &Buffer[K, V]{ - nodeManager: nodeManager, - policyBuffers: unsafe.Pointer(pb), - } - b.returned = b.policyBuffers - return b -} - -// Add lazily publishes the item to the consumer. -// -// item may be lost due to contention. -func (b *Buffer[K, V]) Add(n node.Node[K, V]) *PolicyBuffers[K, V] { - head := b.head.Load() - tail := b.tail.Load() - size := tail - head - if size >= capacity { - // full buffer - return nil - } - if b.tail.CompareAndSwap(tail, tail+1) { - // success - //nolint:gosec // there will never be an overflow - index := int(tail & mask) - atomic.StorePointer(&b.buffer[index], n.AsPointer()) - if size == capacity-1 { - // try return new buffer - if !atomic.CompareAndSwapPointer(&b.returned, b.policyBuffers, nil) { - // somebody already get buffer - return nil - } - - pb := (*PolicyBuffers[K, V])(b.policyBuffers) - for i := 0; i < capacity; i++ { - //nolint:gosec // there will never be an overflow - index := int(head & mask) - v := atomic.LoadPointer(&b.buffer[index]) - if v != nil { - // published - pb.Returned = append(pb.Returned, b.nodeManager.FromPointer(v)) - // release - atomic.StorePointer(&b.buffer[index], nil) - } - head++ - } - - b.head.Store(head) - return pb - } - } - - // failed - return nil -} - -// Free returns the processed buffer back and also clears it. -func (b *Buffer[K, V]) Free() { - pb := (*PolicyBuffers[K, V])(b.policyBuffers) - for i := 0; i < len(pb.Returned); i++ { - pb.Returned[i] = nil - } - pb.Returned = pb.Returned[:0] - atomic.StorePointer(&b.returned, b.policyBuffers) -} - -// Clear clears the lossy Buffer and returns it to the default state. -func (b *Buffer[K, V]) Clear() { - for !atomic.CompareAndSwapPointer(&b.returned, b.policyBuffers, nil) { - runtime.Gosched() - } - for i := 0; i < capacity; i++ { - atomic.StorePointer(&b.buffer[i], nil) - } - b.Free() - b.tail.Store(0) - b.head.Store(0) -} diff --git a/internal/lossy/ring.go b/internal/lossy/ring.go new file mode 100644 index 0000000..3a8870c --- /dev/null +++ b/internal/lossy/ring.go @@ -0,0 +1,124 @@ +// Copyright (c) 2024 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// This is a port of lossy buffers from Caffeine. +// https://github.com/ben-manes/caffeine/blob/master/caffeine/src/main/java/com/github/benmanes/caffeine/cache/BoundedBuffer.java + +package lossy + +import ( + "sync/atomic" + "unsafe" + + "github.com/maypok86/otter/v2/internal/generated/node" + "github.com/maypok86/otter/v2/internal/xruntime" +) + +// Status is the result of adding a node to the buffer. +type Status int8 + +const ( + // Success means that the node was added. + Success Status = 0 + // Failed means that the CAS failed. + Failed Status = -1 + // Full means that the buffer is full. + Full Status = 1 +) + +const ( + // The maximum number of elements per buffer. + bufferSize = 16 + mask = uint64(bufferSize - 1) +) + +// ring is a circular ring buffer stores the elements being transferred by the producers to the consumer. +// the monotonically increasing count of reads and writes allow indexing sequentially to the next +// element location based upon a power-of-two sizing. +// +// The producers race to read the counts, check if there is available capacity, and if so then try +// once to CAS to the next write count. If the increment is successful then the producer lazily +// publishes the element. The producer does not retry or block when unsuccessful due to a failed +// CAS or the buffer being full. +// +// The consumer reads the counts and takes the available elements. The clearing of the elements +// and the next read count are lazily set. +// +// This implementation is striped to further increase concurrency by rehashing and dynamically +// adding new buffers when contention is detected, up to an internal maximum. When rehashing in +// order to discover an available buffer, the producer may retry adding its element to determine +// whether it found a satisfactory buffer or if resizing is necessary. +type ring[K comparable, V any] struct { + head atomic.Uint64 + _ [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte + tail atomic.Uint64 + _ [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte + nodeManager *node.Manager[K, V] + buffer [bufferSize]unsafe.Pointer // node.Node[K, V] +} + +func newRing[K comparable, V any](nodeManager *node.Manager[K, V], n node.Node[K, V]) *ring[K, V] { + r := &ring[K, V]{ + nodeManager: nodeManager, + } + r.buffer[0] = n.AsPointer() + r.tail.Store(1) + return r +} + +func (r *ring[K, V]) add(n node.Node[K, V]) Status { + head := r.head.Load() + tail := r.tail.Load() + size := tail - head + if size >= bufferSize { + return Full + } + + if r.tail.CompareAndSwap(tail, tail+1) { + atomic.StorePointer(&r.buffer[tail&mask], n.AsPointer()) + return Success + } + return Failed +} + +func (r *ring[K, V]) drainTo(consumer func(n node.Node[K, V])) { + head := r.head.Load() + tail := r.tail.Load() + size := tail - head + if size == 0 { + return + } + + nm := r.nodeManager + for head != tail { + index := head & mask + ptr := atomic.LoadPointer(&r.buffer[index]) + if ptr == nil { + // not published. + break + } + atomic.StorePointer(&r.buffer[index], nil) + consumer(nm.FromPointer(ptr)) + head++ + } + r.head.Store(head) +} + +func (r *ring[K, V]) clear() { + for i := 0; i < bufferSize; i++ { + atomic.StorePointer(&r.buffer[i], nil) + } + r.head.Store(0) + r.tail.Store(0) +} diff --git a/internal/lossy/ring_test.go b/internal/lossy/ring_test.go new file mode 100644 index 0000000..ee90926 --- /dev/null +++ b/internal/lossy/ring_test.go @@ -0,0 +1,186 @@ +// Copyright (c) 2024 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lossy + +import ( + "math" + "runtime" + "sync" + "sync/atomic" + "testing" + + "github.com/maypok86/otter/v2/internal/generated/node" + "github.com/maypok86/otter/v2/internal/xmath" + "github.com/maypok86/otter/v2/internal/xruntime" +) + +func TestRing_Add(t *testing.T) { + parallelism := int(xmath.RoundUpPowerOf2(xruntime.Parallelism())) + goroutines := 100 * parallelism + + nm := node.NewManager[int, int](node.Config{ + WithSize: true, + WithExpiration: true, + }) + n := nm.Create(1, 2, 100, 1) + r := &ring[int, int]{ + nodeManager: nm, + } + + var wg sync.WaitGroup + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + + for j := 0; j < 1000; j++ { + res := r.add(n) + if !validStatuses[res] { + t.Errorf("the status must be valid, but got: %v", res) + return + } + runtime.Gosched() + } + }() + } + + tail := r.tail.Load() + if tail <= 0 { + t.Fatalf("the tail must be greater than 0, but got %d", tail) + } + if size := tail - r.head.Load(); tail != size { + t.Fatalf("the tail must be equal to %d, but got %d", size, tail) + } +} + +func TestRing_DrainTo(t *testing.T) { + nm := node.NewManager[int, int](node.Config{ + WithSize: true, + WithExpiration: true, + }) + n := nm.Create(1, 2, 100, 1) + r := &ring[int, int]{ + nodeManager: nm, + } + + for i := 0; i < bufferSize; i++ { + res := r.add(n) + if res != Success && res != Full { + t.Fatalf("the status must be Success or Full, but got: %v", res) + } + } + + reads := uint64(0) + r.drainTo(func(n node.Node[int, int]) { + reads++ + }) + if tail := r.tail.Load(); reads != tail { + t.Fatalf("the tail must be equal to %d, but got %d", reads, tail) + } + if head := r.head.Load(); reads != head { + t.Fatalf("the head must be equal to %d, but got %d", reads, head) + } +} + +func TestRing_AddAndDrain(t *testing.T) { + parallelism := int(xmath.RoundUpPowerOf2(xruntime.Parallelism())) + goroutines := 100 * parallelism + + nm := node.NewManager[int, int](node.Config{ + WithSize: true, + WithExpiration: true, + }) + n := nm.Create(1, 2, 100, 1) + r := &ring[int, int]{ + nodeManager: nm, + } + + var ( + wg sync.WaitGroup + mu sync.Mutex + reads atomic.Uint64 + ) + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + + for j := 0; j < 1000; j++ { + res := r.add(n) + if !validStatuses[res] { + t.Errorf("the status must be valid, but got: %v", res) + return + } + if res == Full && mu.TryLock() { + r.drainTo(func(n node.Node[int, int]) { + reads.Add(1) + }) + mu.Unlock() + } + runtime.Gosched() + } + }() + } + + wg.Wait() + + r.drainTo(func(n node.Node[int, int]) { + reads.Add(1) + }) + + rReads := reads.Load() + if tail := r.tail.Load(); rReads != tail { + t.Fatalf("the tail must be equal to %d, but got %d", rReads, tail) + } + if head := r.head.Load(); rReads != head { + t.Fatalf("the head must be equal to %d, but got %d", rReads, head) + } +} + +func TestRing_Overflow(t *testing.T) { + nm := node.NewManager[int, int](node.Config{ + WithSize: true, + WithExpiration: true, + }) + n := nm.Create(1, 2, 100, 1) + r := &ring[int, int]{ + nodeManager: nm, + } + r.head.Store(math.MaxUint64) + r.tail.Store(math.MaxUint64) + + res := r.add(n) + if res != Success { + t.Fatalf("the status must be Success, but got: %v", res) + } + + var d []node.Node[int, int] + r.drainTo(func(n node.Node[int, int]) { + d = append(d, n) + }) + + for i := 0; i < bufferSize; i++ { + ptr := r.buffer[i] + if ptr != nil { + t.Fatalf("the buffer should contain only nils, but buffer[%d] = %v", i, ptr) + } + } + if tail := r.tail.Load(); tail != 0 { + t.Fatalf("the tail must be equal to 0, but got %d", tail) + } + if head := r.head.Load(); head != 0 { + t.Fatalf("the head must be equal to 0, but got %d", head) + } +} diff --git a/internal/lossy/striped.go b/internal/lossy/striped.go new file mode 100644 index 0000000..ff9e8a5 --- /dev/null +++ b/internal/lossy/striped.go @@ -0,0 +1,197 @@ +// Copyright (c) 2024 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// This is a port of lossy buffers from Caffeine. +// https://github.com/ben-manes/caffeine/blob/master/caffeine/src/main/java/com/github/benmanes/caffeine/cache/StripedBuffer.java + +package lossy + +import ( + "runtime" + "sync/atomic" + + "github.com/maypok86/otter/v2/internal/generated/node" + "github.com/maypok86/otter/v2/internal/xruntime" +) + +const ( + attempts = 3 +) + +type striped[K comparable, V any] struct { + buffers []atomic.Pointer[ring[K, V]] + len int +} + +// Striped is a multiple-producer / single-consumer buffer that rejects new elements if it is full or +// fails spuriously due to contention. Unlike a queue and stack, a buffer does not guarantee an +// ordering of elements in either FIFO or LIFO order. +type Striped[K comparable, V any] struct { + nodeManager *node.Manager[K, V] + maxLen int + striped atomic.Pointer[striped[K, V]] + busy atomic.Uint32 +} + +func NewStriped[K comparable, V any](maxLen int, nodeManager *node.Manager[K, V]) *Striped[K, V] { + return &Striped[K, V]{ + nodeManager: nodeManager, + maxLen: maxLen, + } +} + +// Add inserts the specified element into this buffer if it is possible to do so immediately without +// violating capacity restrictions. The addition is allowed to fail spuriously if multiple +// goroutines insert concurrently. +func (s *Striped[K, V]) Add(n node.Node[K, V]) Status { + h := xruntime.Fastrand() + + bs := s.striped.Load() + if bs == nil { + return s.expandOrRetry(n, h, true) + } + + //nolint:gosec // len will never overflow uint32 + buffer := bs.buffers[h&uint32(bs.len-1)].Load() + if buffer == nil { + return s.expandOrRetry(n, h, true) + } + + result := buffer.add(n) + if result == Failed { + return s.expandOrRetry(n, h, false) + } + + return result +} + +func (s *Striped[K, V]) expandOrRetry(n node.Node[K, V], h uint32, wasUncontended bool) Status { + result := Failed + // True if last slot nonempty. + collide := true + + for attempt := 0; attempt < attempts; attempt++ { + bs := s.striped.Load() + if bs != nil && bs.len > 0 { + //nolint:gosec // len will never overflow uint32 + buffer := bs.buffers[h&uint32(bs.len-1)].Load() + //nolint:gocritic // the switch statement looks even worse here + if buffer == nil { + if s.busy.Load() == 0 && s.busy.CompareAndSwap(0, 1) { + // Try to attach new buffer. + created := false + rs := s.striped.Load() + if rs != nil && rs.len > 0 { + // Recheck under lock. + //nolint:gosec // len will never overflow uint32 + j := h & uint32(rs.len-1) + if rs.buffers[j].Load() == nil { + rs.buffers[j].Store(newRing(s.nodeManager, n)) + created = true + } + } + s.busy.Store(0) + if created { + result = Success + break + } + // Slot is now non-empty. + continue + } + collide = false + } else if !wasUncontended { + // CAS already known to fail. + // Continue after rehash. + wasUncontended = true + } else { + result = buffer.add(n) + //nolint:gocritic // the switch statement looks even worse here + if result != Failed { + break + } else if bs.len >= s.maxLen || s.striped.Load() != bs { + // At max size or stale. + collide = false + } else if !collide { + collide = true + } else if s.busy.Load() == 0 && s.busy.CompareAndSwap(0, 1) { + if s.striped.Load() == bs { + length := bs.len << 1 + striped := &striped[K, V]{ + buffers: make([]atomic.Pointer[ring[K, V]], length), + len: length, + } + for j := 0; j < striped.len; j++ { + striped.buffers[j].Store(striped.buffers[j].Load()) + } + s.striped.Store(striped) + } + s.busy.Store(0) + collide = false + continue + } + } + h = xruntime.Fastrand() + } else if s.busy.Load() == 0 && s.striped.Load() == bs && s.busy.CompareAndSwap(0, 1) { + init := false + if s.striped.Load() == bs { + striped := &striped[K, V]{ + buffers: make([]atomic.Pointer[ring[K, V]], 1), + len: 1, + } + striped.buffers[0].Store(newRing(s.nodeManager, n)) + s.striped.Store(striped) + init = true + } + s.busy.Store(0) + if init { + result = Success + break + } + } + } + + return result +} + +// DrainTo drains the buffer, sending each element to the consumer for processing. The caller must ensure +// that a consumer has exclusive read access to the buffer. +func (s *Striped[K, V]) DrainTo(consumer func(n node.Node[K, V])) { + bs := s.striped.Load() + if bs == nil { + return + } + for i := 0; i < bs.len; i++ { + b := bs.buffers[i].Load() + if b != nil { + b.drainTo(consumer) + } + } +} + +func (s *Striped[K, V]) Clear() { + bs := s.striped.Load() + if bs == nil { + return + } + for s.busy.Load() != 0 || !s.busy.CompareAndSwap(0, 1) { + runtime.Gosched() + } + for i := 0; i < bs.len; i++ { + b := bs.buffers[i].Load() + if b != nil { + b.clear() + } + } + s.busy.Store(0) +} diff --git a/internal/lossy/striped_test.go b/internal/lossy/striped_test.go new file mode 100644 index 0000000..e1f0957 --- /dev/null +++ b/internal/lossy/striped_test.go @@ -0,0 +1,168 @@ +// Copyright (c) 2024 Alexey Mayshev. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lossy + +import ( + "runtime" + "sync" + "sync/atomic" + "testing" + + "github.com/maypok86/otter/v2/internal/generated/node" + "github.com/maypok86/otter/v2/internal/xmath" + "github.com/maypok86/otter/v2/internal/xruntime" +) + +var validStatuses = map[Status]bool{ + Success: true, + Failed: true, + Full: true, +} + +func TestNewStriped(t *testing.T) { + nm := node.NewManager[int, int](node.Config{ + WithSize: true, + WithExpiration: true, + }) + s := NewStriped(64, nm) + if got := s.striped.Load(); got != nil { + t.Fatalf("the striped buffer must be nil, but got: %v", got) + } + res := s.Add(nm.Create(1, 2, 100, 1)) + if l := s.striped.Load().len; l != 1 { + t.Fatalf("the striped buffer length must be 1, but got: %d", l) + } + if res != Success { + t.Fatalf("the status must be success, but got: %v", res) + } +} + +func TestStriped_Add(t *testing.T) { + parallelism := int(xmath.RoundUpPowerOf2(xruntime.Parallelism())) + maxBufferLen := 4 * parallelism + goroutines := 100 * parallelism + + nm := node.NewManager[int, int](node.Config{ + WithSize: true, + WithExpiration: true, + }) + s := NewStriped(maxBufferLen, nm) + n := nm.Create(1, 2, 100, 1) + + var wg sync.WaitGroup + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + + for j := 0; j < 1000; j++ { + res := s.Add(n) + if !validStatuses[res] { + t.Errorf("the status must be valid, but got: %v", res) + return + } + runtime.Gosched() + } + }() + } + + wg.Wait() + + if l := s.striped.Load().len; l > maxBufferLen { + t.Fatalf("the striped buffer length must less than %d, but got: %d", maxBufferLen, l) + } +} + +func TestStriped_DrainTo(t *testing.T) { + nm := node.NewManager[int, int](node.Config{ + WithSize: true, + WithExpiration: true, + }) + s := NewStriped(64, nm) + n := nm.Create(1, 2, 100, 1) + + var drains int + s.DrainTo(func(n node.Node[int, int]) { + drains++ + }) + if drains != 0 { + t.Fatalf("Expected number of drained nodes is 0, but got %d", drains) + } + + res := s.Add(n) + if l := s.striped.Load().len; l != 1 { + t.Fatalf("the striped buffer length must be 1, but got: %d", l) + } + if res != Success { + t.Fatalf("the status must be success, but got: %v", res) + } + + s.DrainTo(func(n node.Node[int, int]) { + drains++ + }) + if drains != 1 { + t.Fatalf("Expected number of drained nodes is 1, but got %d", drains) + } +} + +func TestStriped_AddAndDrain(t *testing.T) { + parallelism := int(xmath.RoundUpPowerOf2(xruntime.Parallelism())) + maxBufferLen := 4 * parallelism + goroutines := 100 * parallelism + + nm := node.NewManager[int, int](node.Config{ + WithSize: true, + WithExpiration: true, + }) + s := NewStriped(maxBufferLen, nm) + n := nm.Create(1, 2, 100, 1) + + var ( + wg sync.WaitGroup + mu sync.Mutex + drains atomic.Uint64 + ) + wg.Add(goroutines) + for i := 0; i < goroutines; i++ { + go func() { + defer wg.Done() + + for j := 0; j < 1000; j++ { + res := s.Add(n) + if !validStatuses[res] { + t.Errorf("the status must be valid, but got: %v", res) + return + } + if res == Full && mu.TryLock() { + s.DrainTo(func(n node.Node[int, int]) { + drains.Add(1) + }) + mu.Unlock() + } + runtime.Gosched() + } + }() + } + + wg.Wait() + + s.DrainTo(func(n node.Node[int, int]) { + drains.Add(1) + }) + + if l := s.striped.Load().len; l != maxBufferLen { + t.Fatalf("the striped buffer length must be %d, but got: %d", maxBufferLen, l) + } +}