From 12b604bea9a116e558fe274cfb728f9045f650ab Mon Sep 17 00:00:00 2001 From: John Date: Mon, 28 Oct 2024 10:12:54 +0800 Subject: [PATCH] Fix BufferedChannelQueue.PutWithTimeout() functionalities. # Conflicts: # queue.go --- queue.go | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/queue.go b/queue.go index b0f47f1..1fcdc23 100644 --- a/queue.go +++ b/queue.go @@ -347,7 +347,7 @@ func (q *LinkedListQueue) KeepNodePoolCount(n int) { last.Next = nil } -// CLear Clear all data +// Clear Clear all data func (q *LinkedListQueue) Clear() { q.nodePoolFirst = q.first q.nodeCount = q.count @@ -694,17 +694,35 @@ func (q *BufferedChannelQueue) Put(val interface{}) error { return q.Offer(val) } -// // PutWithTimeout Put the val(blocking), with timeout -// func (q BufferedChannelQueue) PutWithTimeout(val interface{}, timeout time.Duration) error { -// // q.lock.Lock() -// // defer q.lock.Unlock() -// -// if q.isClosed.Get() { -// return ErrQueueIsClosed -// } -// -// return q.blockingQueue.PutWithTimeout(val, timeout) -// } +// PutWithTimeout Put the T val(blocking), with timeout +func (q *BufferedChannelQueue) PutWithTimeout(val interface{}, timeout time.Duration) error { + // q.lock.Lock() + // defer q.lock.Unlock() + + if q.isClosed.Get() { + return ErrQueueIsClosed + } + + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + //fmt.Println("iteration") + q.loadWorkerCh.Offer(1) + + err := q.Offer(val) + if err == nil { + return nil + } + if errors.Is(err, ErrQueueIsFull) { + return err + } + q.loadWorkerCh.Offer(1) + time.Sleep(q.loadFromPoolDuration) + } + + //return q.blockingQueue.PutWithTimeout(val, timeout) + + return ErrQueuePutTimeout +} // Take Take the val(blocking) func (q *BufferedChannelQueue) Take() (interface{}, error) {