Skip to content

Commit

Permalink
Add lengthTBQueue, fix bug in flushTBQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
mitchellwrosen committed Apr 22, 2018
1 parent 33a36c3 commit 4f63bbc
Showing 1 changed file with 32 additions and 20 deletions.
52 changes: 32 additions & 20 deletions Control/Concurrent/STM/TBQueue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ module Control.Concurrent.STM.TBQueue (
tryPeekTBQueue,
writeTBQueue,
unGetTBQueue,
lengthTBQueue,
isEmptyTBQueue,
isFullTBQueue,
) where
Expand All @@ -52,14 +53,15 @@ import GHC.Conc
--
-- @since 2.4
data TBQueue a
= TBQueue _UPK_(TVar Int) -- CR: read capacity
_UPK_(TVar [a]) -- R: elements waiting to be read
_UPK_(TVar Int) -- CW: write capacity
_UPK_(TVar [a]) -- W: elements written (head is most recent)
= TBQueue _UPK_(TVar Int) -- CR: read capacity
_UPK_(TVar [a]) -- R: elements waiting to be read
_UPK_(TVar Int) -- CW: write capacity
_UPK_(TVar [a]) -- W: elements written (head is most recent)
_UPK_(Int) -- CAP: initial capacity
deriving Typeable

instance Eq (TBQueue a) where
TBQueue a _ _ _ == TBQueue b _ _ _ = a == b
TBQueue a _ _ _ _ == TBQueue b _ _ _ _ = a == b

-- Total channel capacity remaining is CR + CW. Reads only need to
-- access CR, writes usually need to access only CW but sometimes need
Expand All @@ -83,7 +85,7 @@ newTBQueue size = do
write <- newTVar []
rsize <- newTVar 0
wsize <- newTVar size
return (TBQueue rsize read wsize write)
return (TBQueue rsize read wsize write size)

-- |@IO@ version of 'newTBQueue'. This is useful for creating top-level
-- 'TBQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
Expand All @@ -95,11 +97,11 @@ newTBQueueIO size = do
write <- newTVarIO []
rsize <- newTVarIO 0
wsize <- newTVarIO size
return (TBQueue rsize read wsize write)
return (TBQueue rsize read wsize write size)

-- |Write a value to a 'TBQueue'; blocks if the queue is full.
writeTBQueue :: TBQueue a -> a -> STM ()
writeTBQueue (TBQueue rsize _read wsize write) a = do
writeTBQueue (TBQueue rsize _read wsize write _size) a = do
w <- readTVar wsize
if (w /= 0)
then do writeTVar wsize $! w - 1
Expand All @@ -114,7 +116,7 @@ writeTBQueue (TBQueue rsize _read wsize write) a = do

-- |Read the next value from the 'TBQueue'.
readTBQueue :: TBQueue a -> STM a
readTBQueue (TBQueue rsize read _wsize write) = do
readTBQueue (TBQueue rsize read _wsize write _size) = do
xs <- readTVar read
r <- readTVar rsize
writeTVar rsize $! r + 1
Expand Down Expand Up @@ -143,16 +145,17 @@ tryReadTBQueue c = fmap Just (readTBQueue c) `orElse` return Nothing
--
-- @since 2.4.5
flushTBQueue :: TBQueue a -> STM [a]
flushTBQueue (TBQueue rsize read wsize write) = do
flushTBQueue (TBQueue rsize read wsize write size) = do
xs <- readTVar read
ys <- readTVar write
r <- readTVar rsize
w <- readTVar wsize
writeTVar read []
writeTVar write []
writeTVar rsize 0
writeTVar wsize (r + w)
return (xs ++ reverse ys)
if null xs && null ys
then return []
else do
writeTVar read []
writeTVar write []
writeTVar rsize 0
writeTVar wsize size
return (xs ++ reverse ys)

-- | Get the next value from the @TBQueue@ without removing it,
-- retrying if the channel is empty.
Expand All @@ -176,7 +179,7 @@ tryPeekTBQueue c = do
-- |Put a data item back onto a channel, where it will be the next item read.
-- Blocks if the queue is full.
unGetTBQueue :: TBQueue a -> a -> STM ()
unGetTBQueue (TBQueue rsize read wsize _write) a = do
unGetTBQueue (TBQueue rsize read wsize _write _size) a = do
r <- readTVar rsize
if (r > 0)
then do writeTVar rsize $! r - 1
Expand All @@ -188,9 +191,18 @@ unGetTBQueue (TBQueue rsize read wsize _write) a = do
xs <- readTVar read
writeTVar read (a:xs)

-- |Return the length of a 'TBQueue'.
--
-- @Since FIXME
lengthTBQueue :: TBQueue a -> STM Int
lengthTBQueue (TBQueue rsize _read wsize _write size) = do
r <- readTVar rsize
w <- readTVar wsize
return $! size - r - w

-- |Returns 'True' if the supplied 'TBQueue' is empty.
isEmptyTBQueue :: TBQueue a -> STM Bool
isEmptyTBQueue (TBQueue _rsize read _wsize write) = do
isEmptyTBQueue (TBQueue _rsize read _wsize write _size) = do
xs <- readTVar read
case xs of
(_:_) -> return False
Expand All @@ -203,7 +215,7 @@ isEmptyTBQueue (TBQueue _rsize read _wsize write) = do
--
-- @since 2.4.3
isFullTBQueue :: TBQueue a -> STM Bool
isFullTBQueue (TBQueue rsize _read wsize _write) = do
isFullTBQueue (TBQueue rsize _read wsize _write _size) = do
w <- readTVar wsize
if (w > 0)
then return False
Expand Down

0 comments on commit 4f63bbc

Please sign in to comment.