From dfee2323978801a5b2b4252ef6650560d5e6ee59 Mon Sep 17 00:00:00 2001 From: John Date: Fri, 13 May 2022 09:57:38 +0800 Subject: [PATCH] Improve BufferedChannelQueue efficiency BufferedChannelQueue: try offering values to channel at the first time if nothing in the pool --- queue.go | 39 ++++++++++++++++++++++++++++----------- queue_test.go | 4 +--- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/queue.go b/queue.go index e68c37e..8baa902 100644 --- a/queue.go +++ b/queue.go @@ -584,7 +584,16 @@ func (q *BufferedChannelQueue[T]) Put(val T) error { // return ErrQueueIsClosed // } // - // return q.blockingQueue.Put(val) + // q.lock.Lock() + // poolCount := q.pool.Count() + // + // // If appearing nothing in the pool + // if poolCount == 0 { + // defer q.lock.Unlock() + // // Try channel + // return q.blockingQueue.Put(val) + // } + // q.lock.Unlock() return q.Offer(val) } @@ -638,23 +647,31 @@ func (q *BufferedChannelQueue[T]) Offer(val T) error { return ErrQueueIsClosed } + poolCount := q.pool.Count() + + // If appearing nothing in the pool + if poolCount == 0 { + // Try channel + err := q.blockingQueue.Offer(val) + if err == nil { + // Success + return nil + } else if err == ErrQueueIsFull { + // Do nothing and let pool.Offer(val) + } else { + // Other + return err + } + } + // Before +1: >=, After +1: > - if q.pool.Count() >= q.bufferSizeMaximum { + if poolCount >= q.bufferSizeMaximum { return ErrQueueIsFull } q.pool.Offer(val) q.loadWorkerCh.Offer(1) return nil - - // err := q.blockingQueue.Offer(val) - // if err == ErrQueueIsFull { - // q.pool.Offer(val) - // q.loadWorkerCh.Offer(1) - // return nil - // } - // - // return err } // Poll Poll the T val(non-blocking) diff --git a/queue_test.go b/queue_test.go index 0f6ef23..d4bed20 100644 --- a/queue_test.go +++ b/queue_test.go @@ -275,13 +275,10 @@ func TestNewBufferedChannelQueue(t *testing.T) { err = queue.Offer(1) assert.Equal(t, nil, err) - time.Sleep(1 * timeout) err = queue.Offer(2) assert.Equal(t, nil, err) - time.Sleep(1 * timeout) err = queue.Offer(3) assert.Equal(t, nil, err) - time.Sleep(1 * timeout) // Channel: only 3 positions & Buffer: 1 position, now `4` is inserted into the buffer(buffer sizeļ¼š 1) err = queue.Offer(4) assert.Equal(t, nil, err) @@ -319,6 +316,7 @@ func TestNewBufferedChannelQueue(t *testing.T) { go func() { for i := 1; i <= 10000; i++ { // err := bufferedChannelQueue.PutWithTimeout(i, timeout) + // err := bufferedChannelQueue.Put(i) err := bufferedChannelQueue.Offer(i) assert.Equal(t, nil, err) }