Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove buffering from the pipe file handles #86

Merged
merged 2 commits into from
Oct 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## Unreleased

* Remove buffering from the pipe chunked APIs.

## 0.3.1 (Dec 2023)

* Allow streamly-0.10.0 and streamly-core-0.2.0
Expand Down
4 changes: 3 additions & 1 deletion src/DocTestProcess.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

>>> :set -XFlexibleContexts
>>> :set -XScopedTypeVariables
>>> :set -Wno-deprecations
>>> import Data.Char (toUpper)
>>> import Data.Function ((&))
>>> import qualified Streamly.Console.Stdio as Stdio
Expand All @@ -13,7 +14,8 @@

For APIs that have not been released yet.

>>> import qualified Streamly.Internal.Console.Stdio as Stdio (putChars, putChunks)
>>> import Streamly.Internal.System.IO (defaultChunkSize)
>>> import qualified Streamly.Internal.Console.Stdio as Stdio (putChars, putChunks, readChunks)
>>> import qualified Streamly.Internal.FileSystem.Dir as Dir (readFiles)
>>> import qualified Streamly.Internal.System.Process as Process
>>> import qualified Streamly.Internal.Unicode.Stream as Unicode (lines)
Expand Down
129 changes: 102 additions & 27 deletions src/Streamly/Internal/System/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
--

{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- TODO:
Expand Down Expand Up @@ -42,7 +43,6 @@
--
-- - Replace FilePath with a typed path.
--
{-# LANGUAGE FlexibleContexts #-}

module Streamly.Internal.System.Process
(
Expand Down Expand Up @@ -138,7 +138,7 @@ import Streamly.Data.Array (Array)
import Streamly.Data.Fold (Fold)
import Streamly.Data.Stream.Prelude (MonadAsync, Stream)
import System.Exit (ExitCode(..))
import System.IO (hClose, Handle)
import System.IO (hClose, Handle, hSetBuffering, BufferMode(..))
#if !defined(mingw32_HOST_OS)
import System.Posix.Types (CUid (..), CGid (..))
#endif
Expand Down Expand Up @@ -485,7 +485,7 @@ parallel s1 s2 = Stream.parList (Stream.eager True) [s1, s2]
-------------------------------------------------------------------------------
-- Transformation
-------------------------------------------------------------------------------
--

-- | On normal cleanup we do not need to close the pipe handles as they are
-- already guaranteed to be closed (we can assert that) by the time we reach
-- here. We should not kill the process, rather wait for it to terminate
Expand Down Expand Up @@ -574,9 +574,16 @@ createProc' modCfg path args = do
-- XXX Read the exception channel and reap the process if it failed before
-- exec.
parent
hSetBuffering inp NoBuffering
hSetBuffering out NoBuffering
hSetBuffering err NoBuffering
return (Just inp, Just out, err, proc)
#else
createProcess cfg
r@(inp, out, err, _) <- createProcess cfg
mapM_ (`hSetBuffering` NoBuffering) inp
mapM_ (`hSetBuffering` NoBuffering) out
mapM_ (`hSetBuffering` NoBuffering) err
return r
#endif

where
Expand Down Expand Up @@ -634,6 +641,11 @@ pipeChunksEitherWith modifier path args input =

-- | Like 'pipeChunks' but also includes stderr as 'Left' stream in the
-- 'Either' output.
--
-- Definition:
--
-- >>> pipeChunksEither = pipeChunksEitherWith id
--
{-# INLINE pipeChunksEither #-}
pipeChunksEither ::
(MonadCatch m, MonadAsync m)
Expand All @@ -643,10 +655,13 @@ pipeChunksEither ::
-> Stream m (Either (Array Word8) (Array Word8)) -- ^ Output stream
pipeChunksEither = pipeChunksEitherWith id

-- | @pipeBytesEither path args input@ runs the executable at @path@ using @args@
-- as arguments and @input@ stream as its standard input. The error stream of
-- the executable is presented as 'Left' values in the resulting stream and
-- output stream as 'Right' values.
-- | @pipeBytesEither path args input@ runs the executable at @path@ using
-- @args@ as arguments and @input@ stream as its standard input. The error
-- stream of the executable is presented as 'Left' values in the resulting
-- stream and output stream as 'Right' values. The input to the pipe is
-- buffered with a buffer size of 'defaultChunkSize'.
--
-- For control over the buffer use your own chunking and chunk based APIs.
--
-- Raises 'ProcessFailure' exception in case of failure.
--
Expand Down Expand Up @@ -697,7 +712,8 @@ pipeChunksWith modifier path args input =

-- | @pipeChunks file args input@ runs the executable @file@ specified by
-- its name or path using @args@ as arguments and @input@ stream as its
-- standard input. Returns the standard output of the executable as a stream.
-- standard input. Returns the standard output of the process as a stream
-- of chunks of bytes (Array Word8).
--
-- If only the name of an executable file is specified instead of its path then
-- the file name is searched in the directories specified by the PATH
Expand All @@ -719,6 +735,10 @@ pipeChunksWith modifier path args input =
-- :}
--HELLO WORLD
--
-- Definition:
--
-- >>> pipeChunks = pipeChunksWith id
--
-- /pre-release/
{-# INLINE pipeChunks #-}
pipeChunks ::
Expand All @@ -739,8 +759,12 @@ processChunks ::
-> Stream m (Array Word8) -- ^ Output stream
processChunks = pipeChunks

-- | Like 'pipeChunks' except that it works on a stream of bytes instead of
-- a stream of chunks.
-- | Like 'pipeChunks' except that its input and output is stream of bytes
-- instead of a stream of chunks. The input to the pipe is buffered with a
-- buffer size of 'defaultChunkSize'.
--
-- For control over the input buffer use your own chunking and chunk based
-- APIs.
--
-- We can write the example in 'pipeChunks' as follows.
--
Expand Down Expand Up @@ -774,8 +798,14 @@ processBytes ::
-> Stream m Word8 -- ^ Output Stream
processBytes = pipeBytes

-- | Like 'pipeChunks' except that it works on a stream of chars instead of
-- a stream of chunks.
-- | Like 'pipeChunks' except that its input and output is stream of chars
-- instead of a stream of chunks. The input to the pipe is buffered with a
-- buffer size of 'defaultChunkSize'.
--
-- For control over the input buffer use your own chunking and chunk based
-- APIs.
--
-- NOTE: This API uses UTF-8 encoding.
--
-- >>> :{
-- Process.toChars "echo" ["hello world"]
Expand Down Expand Up @@ -847,8 +877,9 @@ toChunksWith modifier path args =
run _ = error "toChunksWith: Not reachable"

-- | @toBytesEither path args@ runs the executable at @path@ using @args@ as
-- arguments and returns a stream of 'Either' bytes. The 'Left' values are from
-- @stderr@ and the 'Right' values are from @stdout@ of the executable.
-- arguments and returns the output of the process as a stream of 'Either'
-- bytes. The 'Left' values are from @stderr@ and the 'Right' values are from
-- @stdout@ of the executable.
--
-- Raises 'ProcessFailure' exception in case of failure.
--
Expand Down Expand Up @@ -877,8 +908,12 @@ toBytesEither path args =
rightRdr = fmap Right Array.reader
in Stream.unfoldMany (Unfold.either leftRdr rightRdr) output

-- | The following code is equivalent to the shell command @echo "hello
-- world"@:
-- | @toBytes path args@ runs the executable specified by @path@ using @args@
-- as arguments and returns the output of the process as a stream of bytes.
--
-- Raises 'ProcessFailure' exception in case of failure.
--
-- The following code is equivalent to the shell command @echo "hello world"@:
--
-- >>> :{
-- Process.toBytes "echo" ["hello world"]
Expand Down Expand Up @@ -921,8 +956,13 @@ toChunksEither ::
-> Stream m (Either (Array Word8) (Array Word8)) -- ^ Output Stream
toChunksEither = toChunksEitherWith id

-- | The following code is equivalent to the shell command @echo "hello
-- world"@:
-- | @toChunks path args@ runs the executable specified by @path@ using @args@
-- as arguments and returns the output of the process as a stream of chunks of
-- bytes (Array Word8).
--
-- Raises 'ProcessFailure' exception in case of failure.
--
-- The following code is equivalent to the shell command @echo "hello world"@:
--
-- >>> :{
-- Process.toChunks "echo" ["hello world"]
Expand All @@ -941,7 +981,15 @@ toChunks ::
-> Stream m (Array Word8) -- ^ Output Stream
toChunks = toChunksWith id

-- |
-- | @toChars path args@ runs the executable specified by @path@ using @args@
-- as arguments and returns the output of the process as a stream of chars.
--
-- NOTE: This API uses UTF-8 encoding.
--
-- Raises 'ProcessFailure' exception in case of failure.
--
-- Definition:
--
-- >>> toChars path args = toBytes path args & Unicode.decodeUtf8
--
{-# INLINE toChars #-}
Expand All @@ -952,8 +1000,21 @@ toChars ::
-> Stream m Char -- ^ Output Stream
toChars path args = toBytes path args & Unicode.decodeUtf8

-- |
-- >>> toLines path args f = toChars path args & Unicode.lines f
-- | @toLines f path args@ runs the executable specified by @path@ using @args@
-- as arguments and folds the output of the process at line breaks, using the
-- fold @f@, to return a stream of folded lines.
--
-- NOTE: This API uses UTF-8 encoding.
--
-- Raises 'ProcessFailure' exception in case of failure.
--
-- To return a stream of lines as strings:
--
-- >>> toStrings = toLines Fold.toList
--
-- Definition:
--
-- >>> toLines f path args = toChars path args & Unicode.lines f
--
{-# INLINE toLines #-}
toLines ::
Expand All @@ -964,18 +1025,28 @@ toLines ::
-> Stream m a -- ^ Output Stream
toLines f path args = toChars path args & Unicode.lines f

-- |
-- >>> toString path args = toChars path args & Stream.fold Fold.toList
-- | @toString path args@ runs the executable specified by @path@ using @args@
-- as arguments and folds the entire output of the process as a single string.
--
-- NOTE: This API uses UTF-8 encoding.
--
-- Definition:
--
-- >>> toString path args = toChars path args & Stream.toList
--
{-# INLINE toString #-}
toString ::
(MonadAsync m, MonadCatch m)
=> FilePath -- ^ Executable name or path
-> [String] -- ^ Arguments
-> m String
toString path args = toChars path args & Stream.fold Fold.toList
toString path args = toChars path args & Stream.toList

-- |
-- | @toStdout path args@ runs the executable specified by @path@ using @args@
-- as arguments and returns the output of the process on stdout.
--
-- Definition:
--
-- >>> toStdout path args = toChunks path args & Stdio.putChunks
--
{-# INLINE toStdout #-}
Expand All @@ -992,7 +1063,11 @@ toStdout path args = do
return ()
-}

-- |
-- | @toNull path args@ runs the executable specified by @path@ using @args@
-- as arguments and discards the output of the process.
--
-- Definition:
--
-- >>> toNull path args = toChunks path args & Stream.fold Fold.drain
--
{-# INLINE toNull #-}
Expand Down
13 changes: 12 additions & 1 deletion src/Streamly/System/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
--
-- >>> :{
-- Process.toBytes "echo" ["hello world"]
-- & Unicode.decodeLatin1 & fmap toUpper & Unicode.encodeLatin1
-- & Unicode.decodeLatin1
-- & fmap toUpper
-- & Unicode.encodeLatin1
-- & Stream.fold Stdio.write
-- :}
-- HELLO WORLD
Expand Down Expand Up @@ -74,6 +76,15 @@
-- & Stream.fold Stdio.writeChunks
-- :}
--
-- = Running Interactive Programs (e.g. ghci)
--
-- >>> :{
-- ghci =
-- Stdio.readChunks
-- & Process.pipeChunks "ghci" []
-- & Stdio.putChunks
-- :}
--
-- = Experimental APIs
--
-- See "Streamly.Internal.System.Process" for unreleased functions.
Expand Down
Loading