Skip to content

Commit

Permalink
[#20] Improve MPSC queue sleeping algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
maypok86 committed Dec 13, 2023
1 parent c58bbe0 commit cdc83fb
Showing 1 changed file with 28 additions and 13 deletions.
41 changes: 28 additions & 13 deletions internal/queue/mpsc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

const (
maxRetries = 100
maxRetries = 16
)

func zeroValue[T any]() T {
Expand Down Expand Up @@ -50,17 +50,21 @@ func NewMPSC[T any](capacity int) *MPSC[T] {

func (q *MPSC[T]) Insert(item T) {
head := q.head.Add(1) - 1
q.wakeUpConsumer()

slot := &q.slots[q.idx(head)]
turn := q.turn(head) * 2
retries := 0
for slot.turn.Load() != turn {
if retries == maxRetries {
q.wakeUpConsumer()
retries = 0
continue
}
retries++
runtime.Gosched()
}

if q.isSleep.CompareAndSwap(1, 0) {
// if the consumer is asleep, we'll wake him up.
q.sleep <- struct{}{}
}

slot.item = item
slot.turn.Store(turn + 1)
}
Expand All @@ -72,9 +76,7 @@ func (q *MPSC[T]) Remove() T {
retries := 0
for slot.turn.Load() != turn {
if retries == maxRetries {
// if the queue's been empty for too long, we fall asleep.
q.isSleep.Store(1)
<-q.sleep
q.sleepConsumer()
retries = 0
continue
}
Expand All @@ -94,14 +96,27 @@ func (q *MPSC[T]) Clear() {
}
}

func (q *MPSC[T]) isEmpty() bool {
return q.tail == q.head.Load()
}

func (q *MPSC[T]) Capacity() int {
return int(q.capacity)
}

func (q *MPSC[T]) wakeUpConsumer() {
if q.isSleep.CompareAndSwap(1, 0) {
// if the consumer is asleep, we'll wake him up.
q.sleep <- struct{}{}
}
}

func (q *MPSC[T]) sleepConsumer() {
// if the queue's been empty for too long, we fall asleep.
q.isSleep.Store(1)
<-q.sleep
}

func (q *MPSC[T]) isEmpty() bool {
return q.tail == q.head.Load()
}

func (q *MPSC[T]) idx(i uint64) uint64 {
return i % q.capacity
}
Expand Down

0 comments on commit cdc83fb

Please sign in to comment.