Skip to content

Commit

Permalink
Make TQueue persist work across transactions
Browse files Browse the repository at this point in the history
Previously, `TQueue` could build up a large write list, leading
to the reader having to do too much work reversing it and aborting.
Rotate the queue more frequently so the reversal work will effectively
be saved even when a transaction aborts.
  • Loading branch information
treeowl committed May 24, 2018
1 parent 92af455 commit 5a14bf3
Showing 1 changed file with 66 additions and 53 deletions.
119 changes: 66 additions & 53 deletions Control/Concurrent/STM/TQueue.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
{-# LANGUAGE CPP, DeriveDataTypeable #-}
{-# LANGUAGE BangPatterns #-}

#if __GLASGOW_HASKELL__ >= 701
{-# LANGUAGE Trustworthy #-}
Expand Down Expand Up @@ -50,57 +51,72 @@ import GHC.Conc
import Control.Monad (unless)
import Data.Typeable (Typeable)

data End a = End !Int [a]

-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
--
-- @since 2.4
data TQueue a = TQueue {-# UNPACK #-} !(TVar [a])
{-# UNPACK #-} !(TVar [a])
data TQueue a = TQueue {-# UNPACK #-} !(TVar Int)
{-# UNPACK #-} !(TVar (End a))
{-# UNPACK #-} !(TVar (End a))
deriving Typeable

instance Eq (TQueue a) where
TQueue a _ == TQueue b _ = a == b
TQueue a _ _ == TQueue b _ _ = a == b

-- |Build and returns a new instance of 'TQueue'
newTQueue :: STM (TQueue a)
newTQueue = do
read <- newTVar []
write <- newTVar []
return (TQueue read write)
old_len <- newTVar 0
read <- newTVar (End 0 [])
write <- newTVar (End 0 [])
return (TQueue old_len read write)

-- |@IO@ version of 'newTQueue'. This is useful for creating top-level
-- 'TQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
-- possible.
newTQueueIO :: IO (TQueue a)
newTQueueIO = do
read <- newTVarIO []
write <- newTVarIO []
return (TQueue read write)
old_len <- newTVarIO 0
read <- newTVarIO (End 0 [])
write <- newTVarIO (End 0 [])
return (TQueue old_len read write)

-- |Write a value to a 'TQueue'.
writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _read write) a = do
listend <- readTVar write
writeTVar write (a:listend)
writeTQueue (TQueue old_len read write) a = do
ol <- readTVar old_len
End write_count listend <- readTVar write
let write_count' = write_count + 2
if write_count' >= ol
then do
End read_count front <- readTVar read
let !len = ol + ((write_count' - read_count) `quot` 2)
writeTVar old_len len
writeTVar read (End 0 (front ++ reverse listend ++ [a]))
writeTVar write (End 0 [])
else writeTVar write (End write_count' (a:listend))

-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do
writeTVar read xs'
return x
[] -> do
ys <- readTVar write
case ys of
[] -> retry
_ -> do
let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be
-- short, otherwise it will conflict
writeTVar write []
writeTVar read zs
return z
readTQueue (TQueue old_len read write) = do
ol <- readTVar old_len
End read_count front <- readTVar read
case front of
[] -> retry
(a:as) -> do
let read_count' = read_count + 2
if read_count' >= ol
then do
End write_count listend <- readTVar write
let !len = ol + ((write_count - read_count') `quot` 2)
writeTVar old_len len
writeTVar read (End 0 (as ++ reverse listend))
writeTVar write (End 0 [])
else do
writeTVar read (End read_count' as)
return a

-- | A version of 'readTQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
Expand All @@ -112,45 +128,42 @@ tryReadTQueue c = fmap Just (readTQueue c) `orElse` return Nothing
--
-- @since 2.4.5
flushTQueue :: TQueue a -> STM [a]
flushTQueue (TQueue read write) = do
xs <- readTVar read
ys <- readTVar write
unless (null xs) $ writeTVar read []
unless (null ys) $ writeTVar write []
flushTQueue (TQueue old_len read write) = do
End read_count xs <- readTVar read
End write_count ys <- readTVar write
unless (read_count == 0 && null xs) $ writeTVar read (End 0 [])
unless (write_count == 0 && null ys) $ writeTVar write (End 0 [])
writeTVar old_len 0
return (xs ++ reverse ys)

-- | Get the next value from the @TQueue@ without removing it,
-- retrying if the channel is empty.
peekTQueue :: TQueue a -> STM a
peekTQueue c = do
x <- readTQueue c
unGetTQueue c x
return x
peekTQueue (TQueue _old_len read _write) = do
End _ xs <- readTVar read
case xs of
x:_ -> return x
[] -> retry

-- | A version of 'peekTQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryPeekTQueue :: TQueue a -> STM (Maybe a)
tryPeekTQueue c = do
m <- tryReadTQueue c
case m of
Nothing -> return Nothing
Just x -> do
unGetTQueue c x
return m
tryPeekTQueue (TQueue _old_len read _write) = do
End _ xs <- readTVar read
case xs of
x:_ -> return (Just x)
[] -> return Nothing

-- |Put a data item back onto a channel, where it will be the next item read.
unGetTQueue :: TQueue a -> a -> STM ()
unGetTQueue (TQueue read _write) a = do
xs <- readTVar read
writeTVar read (a:xs)
unGetTQueue (TQueue _old_len read _write) a = do
End read_count xs <- readTVar read
writeTVar read (End (read_count - 2) (a:xs))

-- |Returns 'True' if the supplied 'TQueue' is empty.
isEmptyTQueue :: TQueue a -> STM Bool
isEmptyTQueue (TQueue read write) = do
xs <- readTVar read
isEmptyTQueue (TQueue _old_len read _write) = do
End _ xs <- readTVar read
case xs of
(_:_) -> return False
[] -> do ys <- readTVar write
case ys of
[] -> return True
_ -> return False
[] -> return True

0 comments on commit 5a14bf3

Please sign in to comment.