Skip to content

Commit

Permalink
Improve BufferedChannelQueue efficiency
Browse files Browse the repository at this point in the history
BufferedChannelQueue: try offering values to channel at the first time if nothing in the pool
  • Loading branch information
johnteee committed May 13, 2022
1 parent 636f2e4 commit dfee232
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 14 deletions.
39 changes: 28 additions & 11 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,16 @@ func (q *BufferedChannelQueue[T]) Put(val T) error {
// return ErrQueueIsClosed
// }
//
// return q.blockingQueue.Put(val)
// q.lock.Lock()
// poolCount := q.pool.Count()
//
// // If appearing nothing in the pool
// if poolCount == 0 {
// defer q.lock.Unlock()
// // Try channel
// return q.blockingQueue.Put(val)
// }
// q.lock.Unlock()

return q.Offer(val)
}
Expand Down Expand Up @@ -638,23 +647,31 @@ func (q *BufferedChannelQueue[T]) Offer(val T) error {
return ErrQueueIsClosed
}

poolCount := q.pool.Count()

// If appearing nothing in the pool
if poolCount == 0 {
// Try channel
err := q.blockingQueue.Offer(val)
if err == nil {
// Success
return nil
} else if err == ErrQueueIsFull {
// Do nothing and let pool.Offer(val)
} else {
// Other
return err
}
}

// Before +1: >=, After +1: >
if q.pool.Count() >= q.bufferSizeMaximum {
if poolCount >= q.bufferSizeMaximum {
return ErrQueueIsFull
}

q.pool.Offer(val)
q.loadWorkerCh.Offer(1)
return nil

// err := q.blockingQueue.Offer(val)
// if err == ErrQueueIsFull {
// q.pool.Offer(val)
// q.loadWorkerCh.Offer(1)
// return nil
// }
//
// return err
}

// Poll Poll the T val(non-blocking)
Expand Down
4 changes: 1 addition & 3 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,10 @@ func TestNewBufferedChannelQueue(t *testing.T) {

err = queue.Offer(1)
assert.Equal(t, nil, err)
time.Sleep(1 * timeout)
err = queue.Offer(2)
assert.Equal(t, nil, err)
time.Sleep(1 * timeout)
err = queue.Offer(3)
assert.Equal(t, nil, err)
time.Sleep(1 * timeout)
// Channel: only 3 positions & Buffer: 1 position, now `4` is inserted into the buffer(buffer size: 1)
err = queue.Offer(4)
assert.Equal(t, nil, err)
Expand Down Expand Up @@ -319,6 +316,7 @@ func TestNewBufferedChannelQueue(t *testing.T) {
go func() {
for i := 1; i <= 10000; i++ {
// err := bufferedChannelQueue.PutWithTimeout(i, timeout)
// err := bufferedChannelQueue.Put(i)
err := bufferedChannelQueue.Offer(i)
assert.Equal(t, nil, err)
}
Expand Down

0 comments on commit dfee232

Please sign in to comment.