From 5ff6a838c908d031cb219a676771e23ce0d45eb1 Mon Sep 17 00:00:00 2001 From: Lautaro Emanuel Date: Fri, 4 Mar 2022 17:09:42 -0300 Subject: [PATCH] Add 'delay' combinator Includes unit test --- conduit/ChangeLog.md | 4 ++++ conduit/src/Data/Conduit/Combinators.hs | 10 ++++++++++ .../src/Data/Conduit/Combinators/Unqualified.hs | 1 + conduit/test/Spec.hs | 15 +++++++++++++-- 4 files changed, 28 insertions(+), 2 deletions(-) diff --git a/conduit/ChangeLog.md b/conduit/ChangeLog.md index d8afc26b5..3f059cea6 100644 --- a/conduit/ChangeLog.md +++ b/conduit/ChangeLog.md @@ -1,5 +1,9 @@ # ChangeLog for conduit +## 1.3.5 + +* Add `delay` [#481](https://github.com/snoyberg/conduit/issues/481) + ## 1.3.4.2 * Fix GHC 9.2 build [#473](https://github.com/snoyberg/conduit/pull/473) diff --git a/conduit/src/Data/Conduit/Combinators.hs b/conduit/src/Data/Conduit/Combinators.hs index a3dab7c95..ee1ca8183 100644 --- a/conduit/src/Data/Conduit/Combinators.hs +++ b/conduit/src/Data/Conduit/Combinators.hs @@ -198,6 +198,7 @@ module Data.Conduit.Combinators , mapAccumS , peekForever , peekForeverE + , delay ) where -- BEGIN IMPORTS @@ -213,6 +214,7 @@ import Data.ByteString.Lazy.Internal (defaultChunkSize) import Control.Applicative (Alternative(..), (<$>)) import Control.Exception (catch, throwIO, finally, bracket, try, evaluate) import Control.Category (Category (..)) +import Control.Concurrent (threadDelay) import Control.Monad (unless, when, (>=>), liftM, forever) import Control.Monad.IO.Unlift (MonadIO (..), MonadUnliftIO, withRunInIO) import Control.Monad.Primitive (PrimMonad, PrimState, unsafePrimToPrim) @@ -2554,3 +2556,11 @@ peekForeverE inner = case mx of Nothing -> return () Just _ -> inner >> loop + +-- | Delays the emission of values for a given number of microseconds. +-- +-- @since 1.3.5 +delay :: MonadIO m => Int -> ConduitT a a m () +delay n = awaitForever $ \v -> do + liftIO $ threadDelay n + yield v \ No newline at end of file diff --git a/conduit/src/Data/Conduit/Combinators/Unqualified.hs b/conduit/src/Data/Conduit/Combinators/Unqualified.hs index 8dd2bfce0..90493d0c6 100644 --- a/conduit/src/Data/Conduit/Combinators/Unqualified.hs +++ b/conduit/src/Data/Conduit/Combinators/Unqualified.hs @@ -180,6 +180,7 @@ module Data.Conduit.Combinators.Unqualified , CC.mapAccumS , CC.peekForever , CC.peekForeverE + , CC.delay ) where -- BEGIN IMPORTS diff --git a/conduit/test/Spec.hs b/conduit/test/Spec.hs index 111f8d0e1..e08ffd353 100644 --- a/conduit/test/Spec.hs +++ b/conduit/test/Spec.hs @@ -7,10 +7,11 @@ module Spec (spec) where import Conduit import Prelude hiding (FilePath) -import Data.Maybe (listToMaybe) +import Data.Maybe (listToMaybe, isNothing) import Data.Conduit.Combinators (slidingWindow, chunksOfE, chunksOfExactlyE) import Data.List (intersperse, sort, find, mapAccumL) import Safe (tailSafe) +import System.CPUTime (getCPUTime) import System.FilePath (takeExtension, ()) import Test.Hspec import Test.Hspec.QuickCheck @@ -21,7 +22,7 @@ import Data.IORef import qualified Data.Vector as V import qualified Data.Vector.Unboxed as VU import qualified Data.Vector.Storable as VS -import Control.Monad (liftM) +import Control.Monad (liftM, when, void) import Control.Monad.ST (runST) import Control.Monad.Trans.Writer import qualified System.IO as IO @@ -637,6 +638,16 @@ spec = do res1 <- runConduit $ yieldMany strs .| linesUnboundedC .| sinkList res2 <- runConduit $ yieldMany strs .| peekForeverE (lineC $ foldC >>= yield) .| sinkList res2 `shouldBe` res1 + it "delay" $ do + let duration = 100000 :: Int + elapsed <- runConduit $ yield () .| delay duration .| do + start <- liftIO $ getCPUTime + mValue <- await + void $ when (isNothing mValue) $ + error "expected 'Just ()'" + end <- liftIO $ getCPUTime + return (end - start) + elapsed `shouldSatisfy` (>= (toInteger duration)) StreamSpec.spec evenInt :: Int -> Bool