Skip to content

Commit

Permalink
Revert "Rewrite TBQueue to use TArray Int (Maybe a)"
Browse files Browse the repository at this point in the history
This reverts commit 9821578.

As noted in #78, the current implementation does not appear to be correct.
  • Loading branch information
bgamari committed Nov 17, 2023
1 parent a1e91f4 commit 79de9e6
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 129 deletions.
266 changes: 143 additions & 123 deletions Control/Concurrent/STM/TBQueue.hs
Original file line number Diff line number Diff line change
Expand Up @@ -46,92 +46,102 @@ module Control.Concurrent.STM.TBQueue (
capacityTBQueue,
) where

#if !MIN_VERSION_base(4,8,0)
import Control.Applicative (pure)
#endif
import Data.Array.Base
import Data.Maybe (isJust, isNothing)
import Data.Typeable (Typeable)
import GHC.Conc
import Numeric.Natural (Natural)
import Prelude hiding (read)

import Control.Concurrent.STM.TArray
import Control.Monad (unless)
import Control.Applicative (pure)
import Data.Typeable (Typeable)
import GHC.Conc (STM, TVar, newTVar, newTVarIO, orElse,
readTVar, retry, writeTVar)
import Numeric.Natural (Natural)
import Prelude hiding (read, pure)

-- | 'TBQueue' is an abstract type representing a bounded FIFO channel.
--
-- @since 2.4
data TBQueue a
= TBQueue {-# UNPACK #-} !(TVar Int) -- read index
{-# UNPACK #-} !(TVar Int) -- write index
{-# UNPACK #-} !(TArray Int (Maybe a)) -- elements
{-# UNPACK #-} !Int -- initial capacity
= TBQueue {-# UNPACK #-} !(TVar Natural) -- CR: read capacity
{-# UNPACK #-} !(TVar [a]) -- R: elements waiting to be read
{-# UNPACK #-} !(TVar Natural) -- CW: write capacity
{-# UNPACK #-} !(TVar [a]) -- W: elements written (head is most recent)
!(Natural) -- CAP: initial capacity
deriving Typeable

instance Eq (TBQueue a) where
-- each `TBQueue` has its own `TVar`s, so it's sufficient to compare the first one
TBQueue a _ _ _ == TBQueue b _ _ _ = a == b

-- incMod x cap == (x + 1) `mod` cap
incMod :: Int -> Int -> Int
incMod x cap = let y = x + 1 in if y == cap then 0 else y
TBQueue a _ _ _ _ == TBQueue b _ _ _ _ = a == b

-- decMod x cap = (x - 1) `mod` cap
decMod :: Int -> Int -> Int
decMod x cap = if x == 0 then cap - 1 else x - 1
-- Total channel capacity remaining is CR + CW. Reads only need to
-- access CR, writes usually need to access only CW but sometimes need
-- CR. So in the common case we avoid contention between CR and CW.
--
-- - when removing an element from R:
-- CR := CR + 1
--
-- - when adding an element to W:
-- if CW is non-zero
-- then CW := CW - 1
-- then if CR is non-zero
-- then CW := CR - 1; CR := 0
-- else **FULL**

-- | Builds and returns a new instance of 'TBQueue'.
newTBQueue :: Natural -- ^ maximum number of elements the queue can hold
-> STM (TBQueue a)
newTBQueue cap
| cap <= 0 = error "capacity has to be greater than 0"
| cap > fromIntegral (maxBound :: Int) = error "capacity is too big"
| otherwise = do
rindex <- newTVar 0
windex <- newTVar 0
elements <- newArray (0, cap' - 1) Nothing
pure (TBQueue rindex windex elements cap')
where
cap' = fromIntegral cap
newTBQueue size = do
read <- newTVar []
write <- newTVar []
rsize <- newTVar 0
wsize <- newTVar size
return (TBQueue rsize read wsize write size)

-- | @IO@ version of 'newTBQueue'. This is useful for creating top-level
-- 'TBQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
-- possible.
newTBQueueIO :: Natural -> IO (TBQueue a)
newTBQueueIO cap
| cap <= 0 = error "capacity has to be greater than 0"
| cap > fromIntegral (maxBound :: Int) = error "capacity is too big"
| otherwise = do
rindex <- newTVarIO 0
windex <- newTVarIO 0
elements <- newArray (0, cap' - 1) Nothing
pure (TBQueue rindex windex elements cap')
where
cap' = fromIntegral cap

-- | Write a value to a 'TBQueue'; retries if the queue is full.
newTBQueueIO size = do
read <- newTVarIO []
write <- newTVarIO []
rsize <- newTVarIO 0
wsize <- newTVarIO size
return (TBQueue rsize read wsize write size)

-- |Write a value to a 'TBQueue'; blocks if the queue is full.
writeTBQueue :: TBQueue a -> a -> STM ()
writeTBQueue (TBQueue _ windex elements cap) a = do
w <- readTVar windex
ele <- unsafeRead elements w
case ele of
Nothing -> unsafeWrite elements w (Just a)
Just _ -> retry
writeTVar windex $! incMod w cap

-- | Read the next value from the 'TBQueue'; retries if the queue is empty.
writeTBQueue (TBQueue rsize _read wsize write _size) a = do
w <- readTVar wsize
if (w > 0)
then do writeTVar wsize $! w - 1
else do
r <- readTVar rsize
if (r > 0)
then do writeTVar rsize 0
writeTVar wsize $! r - 1
else retry
listend <- readTVar write
writeTVar write (a:listend)

-- |Read the next value from the 'TBQueue'.
readTBQueue :: TBQueue a -> STM a
readTBQueue (TBQueue rindex _ elements cap) = do
r <- readTVar rindex
ele <- unsafeRead elements r
a <- case ele of
Nothing -> retry
Just a -> do
unsafeWrite elements r Nothing
pure a
writeTVar rindex $! incMod r cap
pure a
readTBQueue (TBQueue rsize read _wsize write _size) = do
xs <- readTVar read
r <- readTVar rsize
writeTVar rsize $! r + 1
case xs of
(x:xs') -> do
writeTVar read xs'
return x
[] -> do
ys <- readTVar write
case ys of
[] -> retry
_ -> do
-- NB. lazy: we want the transaction to be
-- short, otherwise it will conflict
let ~(z,zs) = case reverse ys of
z':zs' -> (z',zs')
_ -> error "readTBQueue: impossible"
writeTVar write []
writeTVar read zs
return z

-- | A version of 'readTBQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
Expand All @@ -142,89 +152,99 @@ tryReadTBQueue q = fmap Just (readTBQueue q) `orElse` pure Nothing
-- function never retries.
--
-- @since 2.4.5
flushTBQueue :: forall a. TBQueue a -> STM [a]
flushTBQueue (TBQueue _rindex windex elements cap) = do
w <- readTVar windex
go (decMod w cap) []
where
go :: Int -> [a] -> STM [a]
go i acc = do
ele <- unsafeRead elements i
case ele of
Nothing -> pure acc
Just a -> do
unsafeWrite elements i Nothing
go (decMod i cap) (a : acc)
flushTBQueue :: TBQueue a -> STM [a]
flushTBQueue (TBQueue rsize read wsize write size) = do
xs <- readTVar read
ys <- readTVar write
if null xs && null ys
then return []
else do
unless (null xs) $ writeTVar read []
unless (null ys) $ writeTVar write []
writeTVar rsize 0
writeTVar wsize size
return (xs ++ reverse ys)

-- | Get the next value from the @TBQueue@ without removing it,
-- retrying if the queue is empty.
-- retrying if the channel is empty.
peekTBQueue :: TBQueue a -> STM a
peekTBQueue (TBQueue rindex _ elements _) = do
r <- readTVar rindex
ele <- unsafeRead elements r
case ele of
Nothing -> retry
Just a -> pure a
peekTBQueue (TBQueue _ read _ write _) = do
xs <- readTVar read
case xs of
(x:_) -> return x
[] -> do
ys <- readTVar write
case ys of
[] -> retry
_ -> do
let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be
-- short, otherwise it will conflict
writeTVar write []
writeTVar read (z:zs)
return z

-- | A version of 'peekTBQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryPeekTBQueue :: TBQueue a -> STM (Maybe a)
tryPeekTBQueue q = fmap Just (peekTBQueue q) `orElse` pure Nothing
tryPeekTBQueue c = do
m <- tryReadTBQueue c
case m of
Nothing -> return Nothing
Just x -> do
unGetTBQueue c x
return m

-- | Put a data item back onto a channel, where it will be the next item read.
-- Retries if the queue is full.
-- Blocks if the queue is full.
unGetTBQueue :: TBQueue a -> a -> STM ()
unGetTBQueue (TBQueue rindex _ elements cap) a = do
r <- readTVar rindex
ele <- unsafeRead elements r
case ele of
Nothing -> unsafeWrite elements r (Just a)
Just _ -> retry
writeTVar rindex $! decMod r cap
unGetTBQueue (TBQueue rsize read wsize _write _size) a = do
r <- readTVar rsize
if (r > 0)
then do writeTVar rsize $! r - 1
else do
w <- readTVar wsize
if (w > 0)
then writeTVar wsize $! w - 1
else retry
xs <- readTVar read
writeTVar read (a:xs)

-- | Return the length of a 'TBQueue'.
--
-- @since 2.5.0.0
lengthTBQueue :: TBQueue a -> STM Natural
lengthTBQueue (TBQueue rindex windex elements cap) = do
r <- readTVar rindex
w <- readTVar windex
if w == r then do
-- length is 0 or cap
ele <- unsafeRead elements r
case ele of
Nothing -> pure 0
Just _ -> pure $! fromIntegral cap
else do
let len' = w - r
pure $! fromIntegral (if len' < 0 then len' + cap else len')
lengthTBQueue (TBQueue rsize _read wsize _write size) = do
r <- readTVar rsize
w <- readTVar wsize
return $! size - r - w

-- | Returns 'True' if the supplied 'TBQueue' is empty.
isEmptyTBQueue :: TBQueue a -> STM Bool
isEmptyTBQueue (TBQueue rindex windex elements _) = do
r <- readTVar rindex
w <- readTVar windex
if w == r then do
ele <- unsafeRead elements r
pure $! isNothing ele
else
pure False
isEmptyTBQueue (TBQueue _rsize read _wsize write _size) = do
xs <- readTVar read
case xs of
(_:_) -> return False
[] -> do ys <- readTVar write
case ys of
[] -> return True
_ -> return False

-- | Returns 'True' if the supplied 'TBQueue' is full.
--
-- @since 2.4.3
isFullTBQueue :: TBQueue a -> STM Bool
isFullTBQueue (TBQueue rindex windex elements _) = do
r <- readTVar rindex
w <- readTVar windex
if w == r then do
ele <- unsafeRead elements r
pure $! isJust ele
else
pure False
isFullTBQueue (TBQueue rsize _read wsize _write _size) = do
w <- readTVar wsize
if (w > 0)
then return False
else do
r <- readTVar rsize
if (r > 0)
then return False
else return True

-- | The maximum number of elements the queue can hold.
--
-- @since TODO
capacityTBQueue :: TBQueue a -> Natural
capacityTBQueue (TBQueue _ _ _ cap) = fromIntegral cap
capacityTBQueue (TBQueue _ _ _ _ cap) = fromIntegral cap
59 changes: 59 additions & 0 deletions bench/chanbench.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{-# LANGUAGE CPP, RankNTypes #-}
import Control.Concurrent.Async
import Control.Monad
import System.Environment

import Control.Concurrent.Chan
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TBQueue

-- Using CPP rather than a runtime choice between channel types,
-- because we want the compiler to be able to optimise the calls.

-- #define CHAN
-- #define TCHAN
-- #define TQUEUE
-- #define TBQUEUE

#ifdef CHAN
newc = newChan
readc c = readChan c
writec c x = writeChan c x
#elif defined(TCHAN)
newc = newTChanIO
readc c = atomically $ readTChan c
writec c x = atomically $ writeTChan c x
#elif defined(TQUEUE)
newc = newTQueueIO
readc c = atomically $ readTQueue c
writec c x = atomically $ writeTQueue c x
#elif defined(TBQUEUE)
newc = newTBQueueIO 4096
readc c = atomically $ readTBQueue c
writec c x = atomically $ writeTBQueue c x
#endif

main = do
[stest,sn] <- getArgs -- 2000000 is a good number
let n = read sn :: Int
test = read stest :: Int
runtest n test

runtest :: Int -> Int -> IO ()
runtest n test = do
c <- newc
case test of
0 -> do
a <- async $ replicateM_ n $ writec c (1 :: Int)
b <- async $ replicateM_ n $ readc c
waitBoth a b
return ()
1 -> do
replicateM_ n $ writec c (1 :: Int)
replicateM_ n $ readc c
2 -> do
let n1000 = n `quot` 1000
replicateM_ 1000 $ do
replicateM_ n1000 $ writec c (1 :: Int)
replicateM_ n1000 $ readc c
5 changes: 1 addition & 4 deletions cabal.project
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
packages:
.
testsuite
bench
packages: . testsuite/

package testsuite
tests: true
2 changes: 0 additions & 2 deletions hie.yaml

This file was deleted.

Loading

0 comments on commit 79de9e6

Please sign in to comment.