Skip to content

Commit

Permalink
Fix alignment problems on 32-bit arch
Browse files Browse the repository at this point in the history
  • Loading branch information
maypok86 committed Mar 5, 2024
1 parent 21d0a9d commit 9cb2a64
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
lint:
strategy:
matrix:
go-version: [1.18.x]
go-version: [1.19.x]
platform: [ubuntu-latest]

runs-on: ${{ matrix.platform }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test-32-bit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
test:
strategy:
matrix:
go-version: [ 1.18.x, 1.19.x, 1.20.x, 1.21.x ]
go-version: [ 1.19.x, 1.20.x, 1.21.x ]

runs-on: ubuntu-latest

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
test:
strategy:
matrix:
go-version: [ 1.18.x ]
go-version: [ 1.19.x ]
platform: [ ubuntu-latest ]

runs-on: ${{ matrix.platform }}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Otter is based on the following papers

### 📋 Requirements <a id="requirements" />

- Go 1.18+
- Go 1.19+

### 🛠️ Installation <a id="installation" />

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/maypok86/otter

go 1.18
go 1.19

require (
github.com/dolthub/maphash v0.1.0
Expand Down
10 changes: 5 additions & 5 deletions internal/hashtable/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type Map[K comparable, V any] struct {
// used to wake up resize waiters (concurrent modifications)
resizeCond sync.Cond
// resize in progress flag; updated atomically
resizing int64
resizing atomic.Int64
}

type table[K comparable] struct {
Expand Down Expand Up @@ -400,7 +400,7 @@ func (m *Map[K, V]) resize(known *table[K], hint resizeHint) {
}
}
// slow path.
if !atomic.CompareAndSwapInt64(&m.resizing, 0, 1) {
if !m.resizing.CompareAndSwap(0, 1) {
// someone else started resize. Wait for it to finish.
m.waitForResize()
return
Expand All @@ -420,7 +420,7 @@ func (m *Map[K, V]) resize(known *table[K], hint resizeHint) {
} else {
// no need to shrink, wake up all waiters and give up.
m.resizeMutex.Lock()
atomic.StoreInt64(&m.resizing, 0)
m.resizing.Store(0)
m.resizeCond.Broadcast()
m.resizeMutex.Unlock()
return
Expand All @@ -440,7 +440,7 @@ func (m *Map[K, V]) resize(known *table[K], hint resizeHint) {
// publish the new table and wake up all waiters.
atomic.StorePointer(&m.table, unsafe.Pointer(nt))
m.resizeMutex.Lock()
atomic.StoreInt64(&m.resizing, 0)
m.resizing.Store(0)
m.resizeCond.Broadcast()
m.resizeMutex.Unlock()
}
Expand Down Expand Up @@ -473,7 +473,7 @@ func (m *Map[K, V]) newerTableExists(table *table[K]) bool {
}

func (m *Map[K, V]) resizeInProgress() bool {
return atomic.LoadInt64(&m.resizing) == 1
return m.resizing.Load() == 1
}

func (m *Map[K, V]) waitForResize() {
Expand Down
20 changes: 10 additions & 10 deletions internal/lossy/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ type PolicyBuffers[K comparable, V any] struct {
//
// This implementation is striped to further increase concurrency.
type Buffer[K comparable, V any] struct {
head uint64
headPadding [xruntime.CacheLineSize - 8]byte
tail uint64
tailPadding [xruntime.CacheLineSize - 8]byte
head atomic.Uint64
headPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
tail atomic.Uint64
tailPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
nodeManager *node.Manager[K, V]
returned unsafe.Pointer
returnedPadding [xruntime.CacheLineSize - 2*8]byte
Expand All @@ -77,14 +77,14 @@ func New[K comparable, V any](nodeManager *node.Manager[K, V]) *Buffer[K, V] {
//
// item may be lost due to contention.
func (b *Buffer[K, V]) Add(n node.Node[K, V]) *PolicyBuffers[K, V] {
head := atomic.LoadUint64(&b.head)
tail := atomic.LoadUint64(&b.tail)
head := b.head.Load()
tail := b.tail.Load()
size := tail - head
if size >= capacity {
// full buffer
return nil
}
if atomic.CompareAndSwapUint64(&b.tail, tail, tail+1) {
if b.tail.CompareAndSwap(tail, tail+1) {
// success
index := int(tail & mask)
atomic.StorePointer(&b.buffer[index], n.AsPointer())
Expand All @@ -108,7 +108,7 @@ func (b *Buffer[K, V]) Add(n node.Node[K, V]) *PolicyBuffers[K, V] {
head++
}

atomic.StoreUint64(&b.head, head)
b.head.Store(head)
return pb
}
}
Expand Down Expand Up @@ -136,6 +136,6 @@ func (b *Buffer[K, V]) Clear() {
atomic.StorePointer(&b.buffer[i], nil)
}
b.Free()
atomic.StoreUint64(&b.tail, 0)
atomic.StoreUint64(&b.head, 0)
b.tail.Store(0)
b.head.Store(0)
}
29 changes: 15 additions & 14 deletions internal/queue/mpsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package queue
import (
"runtime"
"sync/atomic"
"unsafe"

"github.com/maypok86/otter/internal/xruntime"
)
Expand All @@ -35,18 +36,18 @@ func zeroValue[T any]() T {
type MPSC[T any] struct {
capacity uint64
sleep chan struct{}
head uint64
headPadding [xruntime.CacheLineSize - 8]byte
head atomic.Uint64
headPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
tail uint64
tailPadding [xruntime.CacheLineSize - 8]byte
isSleep uint64
sleepPadding [xruntime.CacheLineSize - 8]byte
isSleep atomic.Uint64
sleepPadding [xruntime.CacheLineSize - unsafe.Sizeof(atomic.Uint64{})]byte
slots []slot[T]
}

type slot[T any] struct {
// uint64 is used here to get proper 8 byte alignment on 32-bit archs.
turn uint64
// atomic.Uint64 is used here to get proper 8 byte alignment on 32-bit archs.
turn atomic.Uint64
item T
}

Expand All @@ -62,13 +63,13 @@ func NewMPSC[T any](capacity int) *MPSC[T] {
// Insert inserts the given item into the queue.
// Blocks, if the queue is full.
func (q *MPSC[T]) Insert(item T) {
head := atomic.AddUint64(&q.head, 1) - 1
head := q.head.Add(1) - 1
q.wakeUpConsumer()

slot := &q.slots[q.idx(head)]
turn := q.turn(head) * 2
retries := 0
for atomic.LoadUint64(&slot.turn) != turn {
for slot.turn.Load() != turn {
if retries == maxRetries {
q.wakeUpConsumer()
retries = 0
Expand All @@ -79,7 +80,7 @@ func (q *MPSC[T]) Insert(item T) {
}

slot.item = item
atomic.StoreUint64(&slot.turn, turn+1)
slot.turn.Store(turn + 1)
}

// Remove retrieves and removes the item from the head of the queue.
Expand All @@ -89,7 +90,7 @@ func (q *MPSC[T]) Remove() T {
slot := &q.slots[q.idx(tail)]
turn := 2*q.turn(tail) + 1
retries := 0
for atomic.LoadUint64(&slot.turn) != turn {
for slot.turn.Load() != turn {
if retries == maxRetries {
q.sleepConsumer()
retries = 0
Expand All @@ -100,7 +101,7 @@ func (q *MPSC[T]) Remove() T {
}
item := slot.item
slot.item = zeroValue[T]()
atomic.StoreUint64(&slot.turn, turn+1)
slot.turn.Store(turn + 1)
q.tail++
return item
}
Expand All @@ -118,20 +119,20 @@ func (q *MPSC[T]) Capacity() int {
}

func (q *MPSC[T]) wakeUpConsumer() {
if atomic.LoadUint64(&q.isSleep) == 1 && atomic.CompareAndSwapUint64(&q.isSleep, 1, 0) {
if q.isSleep.Load() == 1 && q.isSleep.CompareAndSwap(1, 0) {
// if the consumer is asleep, we'll wake him up.
q.sleep <- struct{}{}
}
}

func (q *MPSC[T]) sleepConsumer() {
// if the queue's been empty for too long, we fall asleep.
atomic.StoreUint64(&q.isSleep, 1)
q.isSleep.Store(1)
<-q.sleep
}

func (q *MPSC[T]) isEmpty() bool {
return q.tail == atomic.LoadUint64(&q.head)
return q.tail == q.head.Load()
}

func (q *MPSC[T]) idx(i uint64) uint64 {
Expand Down

0 comments on commit 9cb2a64

Please sign in to comment.