From 110318ad8a0417624398b1af3d5ebafac50d527b Mon Sep 17 00:00:00 2001 From: Ben Gamari Date: Thu, 16 Nov 2023 12:21:10 -0500 Subject: [PATCH] Revert "Rewrite `TBQueue` to use `TArray Int (Maybe a)`" This reverts commit 98215788d9e313bee129e706daf2d8cea5eee9a3. As noted in #78, the current implementation does not appear to be correct. --- Control/Concurrent/STM/TBQueue.hs | 267 ++++++++++++++++-------------- bench/chanbench.hs | 59 +++++++ cabal.project | 5 +- hie.yaml | 2 - testsuite/src/Issue17.hs | 73 ++++++++ testsuite/src/Main.hs | 2 + testsuite/testsuite.cabal | 1 + 7 files changed, 279 insertions(+), 130 deletions(-) create mode 100644 bench/chanbench.hs delete mode 100644 hie.yaml create mode 100644 testsuite/src/Issue17.hs diff --git a/Control/Concurrent/STM/TBQueue.hs b/Control/Concurrent/STM/TBQueue.hs index d5ea578..27430ec 100644 --- a/Control/Concurrent/STM/TBQueue.hs +++ b/Control/Concurrent/STM/TBQueue.hs @@ -46,185 +46,204 @@ module Control.Concurrent.STM.TBQueue ( capacityTBQueue, ) where -#if !MIN_VERSION_base(4,8,0) -import Control.Applicative (pure) -#endif -import Data.Array.Base -import Data.Maybe (isJust, isNothing) -import Data.Typeable (Typeable) -import GHC.Conc -import Numeric.Natural (Natural) -import Prelude hiding (read) - -import Control.Concurrent.STM.TArray +import Control.Monad (unless) +import Data.Typeable (Typeable) +import GHC.Conc (STM, TVar, newTVar, newTVarIO, orElse, + readTVar, retry, writeTVar) +import Numeric.Natural (Natural) +import Prelude hiding (read) -- | 'TBQueue' is an abstract type representing a bounded FIFO channel. -- -- @since 2.4 data TBQueue a - = TBQueue {-# UNPACK #-} !(TVar Int) -- read index - {-# UNPACK #-} !(TVar Int) -- write index - {-# UNPACK #-} !(TArray Int (Maybe a)) -- elements - {-# UNPACK #-} !Int -- initial capacity + = TBQueue {-# UNPACK #-} !(TVar Natural) -- CR: read capacity + {-# UNPACK #-} !(TVar [a]) -- R: elements waiting to be read + {-# UNPACK #-} !(TVar Natural) -- CW: write capacity + {-# UNPACK #-} !(TVar [a]) -- W: elements written (head is most recent) + !(Natural) -- CAP: initial capacity deriving Typeable instance Eq (TBQueue a) where - -- each `TBQueue` has its own `TVar`s, so it's sufficient to compare the first one - TBQueue a _ _ _ == TBQueue b _ _ _ = a == b - --- incMod x cap == (x + 1) `mod` cap -incMod :: Int -> Int -> Int -incMod x cap = let y = x + 1 in if y == cap then 0 else y + TBQueue a _ _ _ _ == TBQueue b _ _ _ _ = a == b --- decMod x cap = (x - 1) `mod` cap -decMod :: Int -> Int -> Int -decMod x cap = if x == 0 then cap - 1 else x - 1 +-- Total channel capacity remaining is CR + CW. Reads only need to +-- access CR, writes usually need to access only CW but sometimes need +-- CR. So in the common case we avoid contention between CR and CW. +-- +-- - when removing an element from R: +-- CR := CR + 1 +-- +-- - when adding an element to W: +-- if CW is non-zero +-- then CW := CW - 1 +-- then if CR is non-zero +-- then CW := CR - 1; CR := 0 +-- else **FULL** -- | Builds and returns a new instance of 'TBQueue'. newTBQueue :: Natural -- ^ maximum number of elements the queue can hold -> STM (TBQueue a) -newTBQueue cap - | cap <= 0 = error "capacity has to be greater than 0" - | cap > fromIntegral (maxBound :: Int) = error "capacity is too big" - | otherwise = do - rindex <- newTVar 0 - windex <- newTVar 0 - elements <- newArray (0, cap' - 1) Nothing - pure (TBQueue rindex windex elements cap') - where - cap' = fromIntegral cap +newTBQueue size = do + read <- newTVar [] + write <- newTVar [] + rsize <- newTVar 0 + wsize <- newTVar size + return (TBQueue rsize read wsize write size) -- | @IO@ version of 'newTBQueue'. This is useful for creating top-level -- 'TBQueue's using 'System.IO.Unsafe.unsafePerformIO', because using -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't -- possible. newTBQueueIO :: Natural -> IO (TBQueue a) -newTBQueueIO cap - | cap <= 0 = error "capacity has to be greater than 0" - | cap > fromIntegral (maxBound :: Int) = error "capacity is too big" - | otherwise = do - rindex <- newTVarIO 0 - windex <- newTVarIO 0 - elements <- newArray (0, cap' - 1) Nothing - pure (TBQueue rindex windex elements cap') - where - cap' = fromIntegral cap - --- | Write a value to a 'TBQueue'; retries if the queue is full. +newTBQueueIO size = do + read <- newTVarIO [] + write <- newTVarIO [] + rsize <- newTVarIO 0 + wsize <- newTVarIO size + return (TBQueue rsize read wsize write size) + +-- |Write a value to a 'TBQueue'; blocks if the queue is full. writeTBQueue :: TBQueue a -> a -> STM () -writeTBQueue (TBQueue _ windex elements cap) a = do - w <- readTVar windex - ele <- unsafeRead elements w - case ele of - Nothing -> unsafeWrite elements w (Just a) - Just _ -> retry - writeTVar windex $! incMod w cap - --- | Read the next value from the 'TBQueue'; retries if the queue is empty. +writeTBQueue (TBQueue rsize _read wsize write _size) a = do + w <- readTVar wsize + if (w > 0) + then do writeTVar wsize $! w - 1 + else do + r <- readTVar rsize + if (r > 0) + then do writeTVar rsize 0 + writeTVar wsize $! r - 1 + else retry + listend <- readTVar write + writeTVar write (a:listend) + +-- |Read the next value from the 'TBQueue'. readTBQueue :: TBQueue a -> STM a -readTBQueue (TBQueue rindex _ elements cap) = do - r <- readTVar rindex - ele <- unsafeRead elements r - a <- case ele of - Nothing -> retry - Just a -> do - unsafeWrite elements r Nothing - pure a - writeTVar rindex $! incMod r cap - pure a +readTBQueue (TBQueue rsize read _wsize write _size) = do + xs <- readTVar read + r <- readTVar rsize + writeTVar rsize $! r + 1 + case xs of + (x:xs') -> do + writeTVar read xs' + return x + [] -> do + ys <- readTVar write + case ys of + [] -> retry + _ -> do + -- NB. lazy: we want the transaction to be + -- short, otherwise it will conflict + let ~(z,zs) = case reverse ys of + z':zs' -> (z',zs') + _ -> error "readTBQueue: impossible" + writeTVar write [] + writeTVar read zs + return z -- | A version of 'readTBQueue' which does not retry. Instead it -- returns @Nothing@ if no value is available. tryReadTBQueue :: TBQueue a -> STM (Maybe a) -tryReadTBQueue q = fmap Just (readTBQueue q) `orElse` pure Nothing +tryReadTBQueue q = fmap Just (readTBQueue q) `orElse` return Nothing -- | Efficiently read the entire contents of a 'TBQueue' into a list. This -- function never retries. -- -- @since 2.4.5 -flushTBQueue :: forall a. TBQueue a -> STM [a] -flushTBQueue (TBQueue _rindex windex elements cap) = do - w <- readTVar windex - go (decMod w cap) [] - where - go :: Int -> [a] -> STM [a] - go i acc = do - ele <- unsafeRead elements i - case ele of - Nothing -> pure acc - Just a -> do - unsafeWrite elements i Nothing - go (decMod i cap) (a : acc) +flushTBQueue :: TBQueue a -> STM [a] +flushTBQueue (TBQueue rsize read wsize write size) = do + xs <- readTVar read + ys <- readTVar write + if null xs && null ys + then return [] + else do + unless (null xs) $ writeTVar read [] + unless (null ys) $ writeTVar write [] + writeTVar rsize 0 + writeTVar wsize size + return (xs ++ reverse ys) -- | Get the next value from the @TBQueue@ without removing it, --- retrying if the queue is empty. +-- retrying if the channel is empty. peekTBQueue :: TBQueue a -> STM a -peekTBQueue (TBQueue rindex _ elements _) = do - r <- readTVar rindex - ele <- unsafeRead elements r - case ele of - Nothing -> retry - Just a -> pure a +peekTBQueue (TBQueue _ read _ write _) = do + xs <- readTVar read + case xs of + (x:_) -> return x + [] -> do + ys <- readTVar write + case ys of + [] -> retry + _ -> do + let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be + -- short, otherwise it will conflict + writeTVar write [] + writeTVar read (z:zs) + return z -- | A version of 'peekTBQueue' which does not retry. Instead it -- returns @Nothing@ if no value is available. tryPeekTBQueue :: TBQueue a -> STM (Maybe a) -tryPeekTBQueue q = fmap Just (peekTBQueue q) `orElse` pure Nothing +tryPeekTBQueue c = do + m <- tryReadTBQueue c + case m of + Nothing -> return Nothing + Just x -> do + unGetTBQueue c x + return m -- | Put a data item back onto a channel, where it will be the next item read. --- Retries if the queue is full. +-- Blocks if the queue is full. unGetTBQueue :: TBQueue a -> a -> STM () -unGetTBQueue (TBQueue rindex _ elements cap) a = do - r <- readTVar rindex - ele <- unsafeRead elements r - case ele of - Nothing -> unsafeWrite elements r (Just a) - Just _ -> retry - writeTVar rindex $! decMod r cap +unGetTBQueue (TBQueue rsize read wsize _write _size) a = do + r <- readTVar rsize + if (r > 0) + then do writeTVar rsize $! r - 1 + else do + w <- readTVar wsize + if (w > 0) + then writeTVar wsize $! w - 1 + else retry + xs <- readTVar read + writeTVar read (a:xs) -- | Return the length of a 'TBQueue'. -- -- @since 2.5.0.0 lengthTBQueue :: TBQueue a -> STM Natural -lengthTBQueue (TBQueue rindex windex elements cap) = do - r <- readTVar rindex - w <- readTVar windex - if w == r then do - -- length is 0 or cap - ele <- unsafeRead elements r - case ele of - Nothing -> pure 0 - Just _ -> pure $! fromIntegral cap - else do - let len' = w - r - pure $! fromIntegral (if len' < 0 then len' + cap else len') +lengthTBQueue (TBQueue rsize _read wsize _write size) = do + r <- readTVar rsize + w <- readTVar wsize + return $! size - r - w -- | Returns 'True' if the supplied 'TBQueue' is empty. isEmptyTBQueue :: TBQueue a -> STM Bool -isEmptyTBQueue (TBQueue rindex windex elements _) = do - r <- readTVar rindex - w <- readTVar windex - if w == r then do - ele <- unsafeRead elements r - pure $! isNothing ele - else - pure False +isEmptyTBQueue (TBQueue _rsize read _wsize write _size) = do + xs <- readTVar read + case xs of + (_:_) -> return False + [] -> do ys <- readTVar write + case ys of + [] -> return True + _ -> return False -- | Returns 'True' if the supplied 'TBQueue' is full. -- -- @since 2.4.3 isFullTBQueue :: TBQueue a -> STM Bool -isFullTBQueue (TBQueue rindex windex elements _) = do - r <- readTVar rindex - w <- readTVar windex - if w == r then do - ele <- unsafeRead elements r - pure $! isJust ele - else - pure False +isFullTBQueue (TBQueue rsize _read wsize _write _size) = do + w <- readTVar wsize + if (w > 0) + then return False + else do + r <- readTVar rsize + if (r > 0) + then return False + else return True -- | The maximum number of elements the queue can hold. -- -- @since TODO capacityTBQueue :: TBQueue a -> Natural -capacityTBQueue (TBQueue _ _ _ cap) = fromIntegral cap +capacityTBQueue (TBQueue _ _ _ _ cap) = fromIntegral cap diff --git a/bench/chanbench.hs b/bench/chanbench.hs new file mode 100644 index 0000000..8c534f1 --- /dev/null +++ b/bench/chanbench.hs @@ -0,0 +1,59 @@ +{-# LANGUAGE CPP, RankNTypes #-} +import Control.Concurrent.Async +import Control.Monad +import System.Environment + +import Control.Concurrent.Chan +import Control.Concurrent.STM +import Control.Concurrent.STM.TQueue +import Control.Concurrent.STM.TBQueue + +-- Using CPP rather than a runtime choice between channel types, +-- because we want the compiler to be able to optimise the calls. + +-- #define CHAN +-- #define TCHAN +-- #define TQUEUE +-- #define TBQUEUE + +#ifdef CHAN +newc = newChan +readc c = readChan c +writec c x = writeChan c x +#elif defined(TCHAN) +newc = newTChanIO +readc c = atomically $ readTChan c +writec c x = atomically $ writeTChan c x +#elif defined(TQUEUE) +newc = newTQueueIO +readc c = atomically $ readTQueue c +writec c x = atomically $ writeTQueue c x +#elif defined(TBQUEUE) +newc = newTBQueueIO 4096 +readc c = atomically $ readTBQueue c +writec c x = atomically $ writeTBQueue c x +#endif + +main = do + [stest,sn] <- getArgs -- 2000000 is a good number + let n = read sn :: Int + test = read stest :: Int + runtest n test + +runtest :: Int -> Int -> IO () +runtest n test = do + c <- newc + case test of + 0 -> do + a <- async $ replicateM_ n $ writec c (1 :: Int) + b <- async $ replicateM_ n $ readc c + waitBoth a b + return () + 1 -> do + replicateM_ n $ writec c (1 :: Int) + replicateM_ n $ readc c + 2 -> do + let n1000 = n `quot` 1000 + replicateM_ 1000 $ do + replicateM_ n1000 $ writec c (1 :: Int) + replicateM_ n1000 $ readc c diff --git a/cabal.project b/cabal.project index faa73e7..c343211 100644 --- a/cabal.project +++ b/cabal.project @@ -1,7 +1,4 @@ -packages: - . - testsuite - bench +packages: . testsuite/ package testsuite tests: true diff --git a/hie.yaml b/hie.yaml deleted file mode 100644 index 04cd243..0000000 --- a/hie.yaml +++ /dev/null @@ -1,2 +0,0 @@ -cradle: - cabal: diff --git a/testsuite/src/Issue17.hs b/testsuite/src/Issue17.hs new file mode 100644 index 0000000..06b72f0 --- /dev/null +++ b/testsuite/src/Issue17.hs @@ -0,0 +1,73 @@ +{-# LANGUAGE CPP #-} + +-- see https://github.com/haskell/stm/pull/19 +-- +-- Test-case contributed by Alexey Kuleshevich +-- +-- This bug is observable in all versions with TBQueue from `stm-2.4` to +-- `stm-2.4.5.1` inclusive. + +module Issue17 (main) where + +import Control.Concurrent.STM +import Test.HUnit.Base (assertBool, assertEqual) + +main :: IO () +main = do + -- New queue capacity is set to 0 + queueIO <- newTBQueueIO 0 + assertNoCapacityTBQueue queueIO + + -- Same as above, except created within STM + queueSTM <- atomically $ newTBQueue 0 + assertNoCapacityTBQueue queueSTM + +#if !MIN_VERSION_stm(2,5,0) + -- NB: below are expected failures + + -- New queue capacity is set to a negative numer + queueIO' <- newTBQueueIO (-1 :: Int) + assertNoCapacityTBQueue queueIO' + + -- Same as above, except created within STM and different negative number + queueSTM' <- atomically $ newTBQueue (minBound :: Int) + assertNoCapacityTBQueue queueSTM' +#endif + +assertNoCapacityTBQueue :: TBQueue Int -> IO () +assertNoCapacityTBQueue queue = do + assertEmptyTBQueue queue + assertFullTBQueue queue + + -- Attempt to write into the queue. + eValWrite <- atomically $ orElse (fmap Left (writeTBQueue queue 217)) + (fmap Right (tryReadTBQueue queue)) + assertEqual "Expected queue with no capacity: writeTBQueue" eValWrite (Right Nothing) + eValUnGet <- atomically $ orElse (fmap Left (unGetTBQueue queue 218)) + (fmap Right (tryReadTBQueue queue)) + assertEqual "Expected queue with no capacity: unGetTBQueue" eValUnGet (Right Nothing) + + -- Make sure that attempt to write didn't affect the queue + assertEmptyTBQueue queue + assertFullTBQueue queue + + +assertEmptyTBQueue :: TBQueue Int -> IO () +assertEmptyTBQueue queue = do + atomically (isEmptyTBQueue queue) >>= + assertBool "Expected empty: isEmptyTBQueue should return True" + + atomically (tryReadTBQueue queue) >>= + assertEqual "Expected empty: tryReadTBQueue should return Nothing" Nothing + + atomically (tryPeekTBQueue queue) >>= + assertEqual "Expected empty: tryPeekTBQueue should return Nothing" Nothing + + atomically (flushTBQueue queue) >>= + assertEqual "Expected empty: flushTBQueue should return []" [] + + +assertFullTBQueue :: TBQueue Int -> IO () +assertFullTBQueue queue = do + atomically (isFullTBQueue queue) >>= + assertBool "Expected full: isFullTBQueue shoule return True" diff --git a/testsuite/src/Main.hs b/testsuite/src/Main.hs index 8cbb8db..09802d2 100644 --- a/testsuite/src/Main.hs +++ b/testsuite/src/Main.hs @@ -6,6 +6,7 @@ import Test.Framework (defaultMain, testGroup) import Test.Framework.Providers.HUnit import qualified Issue9 +import qualified Issue17 import qualified Stm052 import qualified Stm064 import qualified Stm065 @@ -18,6 +19,7 @@ main = do tests = [ testGroup "regression" [ testCase "issue #9" Issue9.main + , testCase "issue #17" Issue17.main , testCase "stm052" Stm052.main , testCase "stm064" Stm064.main , testCase "stm065" Stm065.main diff --git a/testsuite/testsuite.cabal b/testsuite/testsuite.cabal index 863057f..f77d8e5 100644 --- a/testsuite/testsuite.cabal +++ b/testsuite/testsuite.cabal @@ -20,6 +20,7 @@ test-suite stm main-is: Main.hs other-modules: Issue9 + Issue17 Stm052 Stm064 Stm065