diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 0335049..d64a429 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -9,7 +9,7 @@ jobs: lint: strategy: matrix: - go-version: [1.21.x] + go-version: [1.18.x] platform: [ubuntu-latest] runs-on: ${{ matrix.platform }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 07321c5..59f3b8f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -9,7 +9,7 @@ jobs: test: strategy: matrix: - go-version: [ 1.21.x ] + go-version: [ 1.18.x ] platform: [ ubuntu-latest ] runs-on: ${{ matrix.platform }} diff --git a/internal/hashtable/map.go b/internal/hashtable/map.go index 2ee48a2..eb073a6 100644 --- a/internal/hashtable/map.go +++ b/internal/hashtable/map.go @@ -69,7 +69,7 @@ type Map[K comparable, V any] struct { // used to wake up resize waiters (concurrent modifications) resizeCond sync.Cond // resize in progress flag; updated atomically - resizing atomic.Int64 + resizing int64 } type table[K comparable] struct { @@ -400,7 +400,7 @@ func (m *Map[K, V]) resize(known *table[K], hint resizeHint) { } } // slow path. - if !m.resizing.CompareAndSwap(0, 1) { + if !atomic.CompareAndSwapInt64(&m.resizing, 0, 1) { // someone else started resize. Wait for it to finish. m.waitForResize() return @@ -420,7 +420,7 @@ func (m *Map[K, V]) resize(known *table[K], hint resizeHint) { } else { // no need to shrink, wake up all waiters and give up. m.resizeMutex.Lock() - m.resizing.Store(0) + atomic.StoreInt64(&m.resizing, 0) m.resizeCond.Broadcast() m.resizeMutex.Unlock() return @@ -440,7 +440,7 @@ func (m *Map[K, V]) resize(known *table[K], hint resizeHint) { // publish the new table and wake up all waiters. atomic.StorePointer(&m.table, unsafe.Pointer(nt)) m.resizeMutex.Lock() - m.resizing.Store(0) + atomic.StoreInt64(&m.resizing, 0) m.resizeCond.Broadcast() m.resizeMutex.Unlock() } @@ -473,7 +473,7 @@ func (m *Map[K, V]) newerTableExists(table *table[K]) bool { } func (m *Map[K, V]) resizeInProgress() bool { - return m.resizing.Load() == 1 + return atomic.LoadInt64(&m.resizing) == 1 } func (m *Map[K, V]) waitForResize() { diff --git a/internal/lossy/buffer.go b/internal/lossy/buffer.go index 6bb35a6..179a759 100644 --- a/internal/lossy/buffer.go +++ b/internal/lossy/buffer.go @@ -48,10 +48,10 @@ type PolicyBuffers[K comparable, V any] struct { // // This implementation is striped to further increase concurrency. type Buffer[K comparable, V any] struct { - head atomic.Uint64 - headPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte - tail atomic.Uint64 - tailPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte + head uint64 + headPadding [xruntime.CacheLineSize - 8]byte + tail uint64 + tailPadding [xruntime.CacheLineSize - 8]byte nodeManager *node.Manager[K, V] returned unsafe.Pointer returnedPadding [xruntime.CacheLineSize - 2*8]byte @@ -77,14 +77,14 @@ func New[K comparable, V any](nodeManager *node.Manager[K, V]) *Buffer[K, V] { // // item may be lost due to contention. func (b *Buffer[K, V]) Add(n node.Node[K, V]) *PolicyBuffers[K, V] { - head := b.head.Load() - tail := b.tail.Load() + head := atomic.LoadUint64(&b.head) + tail := atomic.LoadUint64(&b.tail) size := tail - head if size >= capacity { // full buffer return nil } - if b.tail.CompareAndSwap(tail, tail+1) { + if atomic.CompareAndSwapUint64(&b.tail, tail, tail+1) { // success index := int(tail & mask) atomic.StorePointer(&b.buffer[index], n.AsPointer()) @@ -108,7 +108,7 @@ func (b *Buffer[K, V]) Add(n node.Node[K, V]) *PolicyBuffers[K, V] { head++ } - b.head.Store(head) + atomic.StoreUint64(&b.head, head) return pb } } @@ -136,6 +136,6 @@ func (b *Buffer[K, V]) Clear() { atomic.StorePointer(&b.buffer[i], nil) } b.Free() - b.tail.Store(0) - b.head.Store(0) + atomic.StoreUint64(&b.tail, 0) + atomic.StoreUint64(&b.head, 0) } diff --git a/internal/queue/mpsc.go b/internal/queue/mpsc.go index 3f36316..a537aea 100644 --- a/internal/queue/mpsc.go +++ b/internal/queue/mpsc.go @@ -12,7 +12,6 @@ package queue import ( "runtime" "sync/atomic" - "unsafe" "github.com/maypok86/otter/internal/xruntime" ) @@ -36,18 +35,18 @@ func zeroValue[T any]() T { type MPSC[T any] struct { capacity uint64 sleep chan struct{} - head atomic.Uint64 - headPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte + head uint64 + headPadding [xruntime.CacheLineSize - 8]byte tail uint64 tailPadding [xruntime.CacheLineSize - 8]byte - isSleep atomic.Uint64 - sleepPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte + isSleep uint64 + sleepPadding [xruntime.CacheLineSize - 8]byte slots []slot[T] } type slot[T any] struct { - // atomic.Uint64 is used here to get proper 8 byte alignment on 32-bit archs. - turn atomic.Uint64 + // uint64 is used here to get proper 8 byte alignment on 32-bit archs. + turn uint64 item T } @@ -63,13 +62,13 @@ func NewMPSC[T any](capacity int) *MPSC[T] { // Insert inserts the given item into the queue. // Blocks, if the queue is full. func (q *MPSC[T]) Insert(item T) { - head := q.head.Add(1) - 1 + head := atomic.AddUint64(&q.head, 1) - 1 q.wakeUpConsumer() slot := &q.slots[q.idx(head)] turn := q.turn(head) * 2 retries := 0 - for slot.turn.Load() != turn { + for atomic.LoadUint64(&slot.turn) != turn { if retries == maxRetries { q.wakeUpConsumer() retries = 0 @@ -80,7 +79,7 @@ func (q *MPSC[T]) Insert(item T) { } slot.item = item - slot.turn.Store(turn + 1) + atomic.StoreUint64(&slot.turn, turn+1) } // Remove retrieves and removes the item from the head of the queue. @@ -90,7 +89,7 @@ func (q *MPSC[T]) Remove() T { slot := &q.slots[q.idx(tail)] turn := 2*q.turn(tail) + 1 retries := 0 - for slot.turn.Load() != turn { + for atomic.LoadUint64(&slot.turn) != turn { if retries == maxRetries { q.sleepConsumer() retries = 0 @@ -101,7 +100,7 @@ func (q *MPSC[T]) Remove() T { } item := slot.item slot.item = zeroValue[T]() - slot.turn.Store(turn + 1) + atomic.StoreUint64(&slot.turn, turn+1) q.tail++ return item } @@ -119,7 +118,7 @@ func (q *MPSC[T]) Capacity() int { } func (q *MPSC[T]) wakeUpConsumer() { - if q.isSleep.Load() == 1 && q.isSleep.CompareAndSwap(1, 0) { + if atomic.LoadUint64(&q.isSleep) == 1 && atomic.CompareAndSwapUint64(&q.isSleep, 1, 0) { // if the consumer is asleep, we'll wake him up. q.sleep <- struct{}{} } @@ -127,12 +126,12 @@ func (q *MPSC[T]) wakeUpConsumer() { func (q *MPSC[T]) sleepConsumer() { // if the queue's been empty for too long, we fall asleep. - q.isSleep.Store(1) + atomic.StoreUint64(&q.isSleep, 1) <-q.sleep } func (q *MPSC[T]) isEmpty() bool { - return q.tail == q.head.Load() + return q.tail == atomic.LoadUint64(&q.head) } func (q *MPSC[T]) idx(i uint64) uint64 {