diff --git a/Benchmark/System/Process.hs b/Benchmark/System/Process.hs index 64a6bc08..945289ea 100644 --- a/Benchmark/System/Process.hs +++ b/Benchmark/System/Process.hs @@ -13,12 +13,12 @@ import System.IO , openFile , hClose ) -import System.Process (proc, createProcess, waitForProcess, callCommand) import qualified Streamly.Data.Fold as FL import qualified Streamly.FileSystem.Handle as FH import qualified Streamly.Prelude as S import qualified Streamly.System.Process as Proc +import qualified Streamly.Internal.System.Command as Cmd -- Internal imports import qualified Streamly.Internal.FileSystem.Handle @@ -71,16 +71,12 @@ largeByteFile = "./largeByteFile" generateByteFile :: IO () generateByteFile = do ddPath <- which "dd" - let procObj = proc ddPath [ - "if=" ++ devRandom, - "of=" ++ largeByteFile, - "count=" ++ show ddBlockCount, - "bs=" ++ show ddBlockSize - ] - - (_, _, _, procHandle) <- createProcess procObj - _ <- waitForProcess procHandle - return () + Cmd.toStdout + $ ddPath + ++ " if=" ++ devRandom + ++ " of=" ++ largeByteFile + ++ " count=" ++ show ddBlockCount + ++ " bs=" ++ show ddBlockSize ------------------------------------------------------------------------------- -- Create a file filled with ascii chars @@ -112,7 +108,7 @@ trToStderrContent = createExecutable :: IO () createExecutable = do writeFile trToStderr trToStderrContent - callCommand ("chmod +x " ++ trToStderr) + Cmd.toStdout ("chmod +x " ++ trToStderr) ------------------------------------------------------------------------------- -- Create and delete the temp data/exec files diff --git a/default.nix b/default.nix index 550cba8d..f6dcc745 100644 --- a/default.nix +++ b/default.nix @@ -47,7 +47,7 @@ let haskellPackages = # } {}) (let src = fetchGit { url = "git@github.com:composewell/streamly.git"; - rev = "1ee11e87ec920df66e6bb1299ab000948df90ae5"; + rev = "4bb8b7c950ffeee9d5c9c3ca23c65be93ca34f0b"; }; in super.callCabal2nix "streamly" src {}) (old: { librarySystemDepends = @@ -62,7 +62,7 @@ let haskellPackages = nixpkgs.haskell.lib.overrideCabal (let src = fetchGit { url = "git@github.com:composewell/streamly.git"; - rev = "cbccb7777792cb4bf8dd8716929f4e28ea6cf718"; + rev = "4bb8b7c950ffeee9d5c9c3ca23c65be93ca34f0b"; }; in super.callCabal2nix "streamly-core" "${src}/core" {}) (old: { librarySystemDepends = diff --git a/src/Streamly/Internal/System/Command.hs b/src/Streamly/Internal/System/Command.hs index cd8af967..b45900fe 100644 --- a/src/Streamly/Internal/System/Command.hs +++ b/src/Streamly/Internal/System/Command.hs @@ -61,7 +61,7 @@ where import Control.Monad.Catch (MonadCatch) import Data.Char (isSpace) import Data.Word (Word8) -import Streamly.Data.Array.Foreign (Array) +import Streamly.Data.Array.Unboxed (Array) import Streamly.Data.Fold (Fold) import Streamly.Internal.Data.Parser (Parser) import Streamly.Prelude (MonadAsync, SerialT) diff --git a/src/Streamly/Internal/System/Process.hs b/src/Streamly/Internal/System/Process.hs index 1d305655..51077d03 100644 --- a/src/Streamly/Internal/System/Process.hs +++ b/src/Streamly/Internal/System/Process.hs @@ -93,28 +93,27 @@ module Streamly.Internal.System.Process ) where --- #define USE_NATIVE - +import Control.Exception (Exception(..), catch, throwIO) +import Control.Monad (void, unless) import Control.Monad.Catch (MonadCatch, throwM) import Control.Monad.IO.Class (MonadIO, liftIO) +import Control.Concurrent (forkIO, forkOS, runInBoundThread) +import Control.Concurrent.MVar import Data.Function ((&)) import Data.Word (Word8) -import Streamly.Data.Array.Foreign (Array) +import Foreign.C.Error (Errno(..), ePIPE) +import GHC.IO.Exception (IOException(..), IOErrorType(..)) +import Streamly.Data.Array.Unboxed (Array) import Streamly.Data.Fold (Fold) import Streamly.Prelude (MonadAsync, parallel, IsStream, adapt, SerialT) import System.Exit (ExitCode(..)) import System.IO (hClose, Handle) #ifdef USE_NATIVE -import Control.Exception (Exception(..), catch, throwIO, SomeException) +import Control.Exception (SomeException) import System.Posix.Process (ProcessStatus(..)) import Streamly.Internal.System.Process.Posix #else -import Control.Concurrent (forkIO) -import Control.Exception (Exception(..), catch, throwIO) -import Control.Monad (void, unless) -import Foreign.C.Error (Errno(..), ePIPE) -import GHC.IO.Exception (IOException(..), IOErrorType(..)) import System.Process ( ProcessHandle , CreateProcess(..) @@ -126,7 +125,7 @@ import System.Process ) #endif -import qualified Streamly.Data.Array.Foreign as Array +import qualified Streamly.Data.Array.Unboxed as Array import qualified Streamly.Data.Fold as Fold import qualified Streamly.Prelude as Stream @@ -185,6 +184,13 @@ mkConfig _ _ = Config False pipeStdErr :: Config -> Config pipeStdErr (Config _) = Config True + +inheritStdin :: Config -> Config +inheritStdin (Config _) = Config True + +inheritStdout :: Config -> Config +inheritStdout (Config _) = Config True + #else newtype Config = Config CreateProcess @@ -288,7 +294,11 @@ cleanupException :: MonadIO m => (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> m () cleanupException (Just stdinH, Just stdoutH, stderrMaybe, ph) = liftIO $ do -- Send a SIGTERM to the process +#ifdef USE_NATIVE + terminate ph +#else terminateProcess ph +#endif -- Ideally we should be closing the handle without flushing the buffers so -- that we cannot get a SIGPIPE. But there seems to be no way to do that as @@ -298,7 +308,11 @@ cleanupException (Just stdinH, Just stdoutH, stderrMaybe, ph) = liftIO $ do whenJust hClose stderrMaybe -- Non-blocking wait for the process to go away +#ifdef USE_NATIVE + void $ forkIO (void $ wait ph) +#else void $ forkIO (void $ waitForProcess ph) +#endif where @@ -343,6 +357,14 @@ createProc' modCfg path args = do Config cfg = modCfg $ mkConfig path args +createProc'' :: + (Config -> Config) -- ^ Process attribute modifier + -> FilePath -- ^ Executable path + -> [String] -- ^ Arguments + -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) +createProc'' modCfg path args = + runInBoundThread $ createProc' modCfg path args + {-# INLINE putChunksClose #-} putChunksClose :: (MonadIO m, IsStream t) => Handle -> t m (Array Word8) -> t m a diff --git a/src/Streamly/Internal/System/Process/Posix.hs b/src/Streamly/Internal/System/Process/Posix.hs index 4903a545..fe87360a 100644 --- a/src/Streamly/Internal/System/Process/Posix.hs +++ b/src/Streamly/Internal/System/Process/Posix.hs @@ -1,3 +1,4 @@ +-- {-# LANGUAGE Safe #-} -- | -- Module : Streamly.Internal.System.Process.Posix -- Copyright : (c) 2020 Composewell Technologies @@ -17,6 +18,7 @@ module Streamly.Internal.System.Process.Posix , newProcess , wait , getStatus + , terminate -- * IPC , mkPipe @@ -33,15 +35,19 @@ import Data.Tuple (swap) import GHC.IO.Device (IODeviceType(..)) import GHC.IO.Encoding (getLocaleEncoding) import GHC.IO.Handle.FD (mkHandleFromFD) -import System.IO (IOMode(..), Handle) +import System.IO (IOMode(..), Handle, hPutStrLn, stderr) import System.IO.Error (isDoesNotExistError) import System.Posix.IO (createPipe, dupTo, closeFd) -import System.Posix.Process (forkProcess, executeFile, ProcessStatus) +import System.Posix.Process (forkProcess, executeFile, ProcessStatus, getProcessID) import System.Posix.Types (ProcessID, Fd(..), CDev, CIno) +import System.Posix.Signals (signalProcess, sigTERM) import System.Posix.Internals (fdGetMode) +import qualified Streamly.Internal.FileSystem.Dir as Dir +import qualified Streamly.Prelude as Stream import qualified GHC.IO.FD as FD import qualified System.Posix.Process as Posix +import Data.List (intercalate) ------------------------------------------------------------------------------- -- Utilities to create stdio handles @@ -51,13 +57,13 @@ import qualified System.Posix.Process as Posix -- We have to put the FDs into binary mode on Windows to avoid the newline -- translation that the CRT IO library does. setBinaryMode :: FD.FD -> IO () -#if defined(mingw32_HOST_OS) -setBinaryMode fd = do - _ <- setmode (FD.fdFD fd) True - return () -#else + + + + + setBinaryMode _ = return () -#endif + -- See Posix.fdToHandle and GHC.IO.Handle.FD.fdToHandle -- See stdin, stdout, stderr in module GHC.IO.Handle.FD @@ -109,9 +115,18 @@ mkPipeDupChild :: Direction -> Fd -> IO (Fd, (IO (), IO (), IO ())) mkPipeDupChild direction childFd = do let setDirection = if direction == ParentToChild then id else swap (child, parent) <- fmap setDirection createPipe + pid <- getProcessID let parentAction = closeFd child childAction = - closeFd parent >> void (dupTo child childFd) >> closeFd child + hPutStrLn stderr ("closing parent fd" ++ show parent) >> + closeFd parent >> + hPutStrLn stderr ("closed parent fd" ++ show parent) >> + hPutStrLn stderr ("duplicating child to fd" ++ show (child, childFd)) >> + void (dupTo child childFd) >> + hPutStrLn stderr ("duplicated child to fd" ++ show (child, childFd)) >> + hPutStrLn stderr ("closing child" ++ show child) >> + closeFd child >> + hPutStrLn stderr ("closed child" ++ show child) failureAction = closeFd child >> closeFd parent return (parent, (parentAction, childAction, failureAction)) @@ -141,7 +156,15 @@ mkStdioPipes pipeStdErr = do -} let parentAction = inpParent >> outParent >> errParent -- >> excParent - childAction = inpChild >> outChild >> errChild -- >> excChild + childAction = + hPutStrLn stderr "child input action doing" + >> inpChild + >> hPutStrLn stderr "child input action done" + >> hPutStrLn stderr "child output action doing" + >> outChild + >> hPutStrLn stderr "child output action done" + >> errChild -- >> excChild + -- childAction = inpChild >> outChild >> errChild -- >> excChild failureAction = inpFail >> outFail >> errFail -- >> excFail inpH <- toHandle WriteMode inp @@ -235,6 +258,7 @@ newProcess :: -> IO Process newProcess action path args env = do pid <- forkProcess exec + hPutStrLn stderr ("parent process " ++ show pid) pidToProcess pid Nothing where @@ -243,7 +267,12 @@ newProcess action path args env = do -- to the parent and clean up the parent fds. We can send the exceptions -- via a pipe like we do for threads. -- - exec = action >> executeFile path True args env + exec = do + pid <- getProcessID + hPutStrLn stderr ("child process " ++ show pid) + fds <- Stream.toList . Stream.unfold Dir.readFiles $ ("/proc/" ++ show pid ++ "/fd") + hPutStrLn stderr (intercalate ", " fds) + action >> executeFile path True args env newtype ProcessDoesNotExist = ProcessDoesNotExist ProcessID deriving Show @@ -315,3 +344,6 @@ getStatus proc@(Process pid _ procStatus) = do if isDoesNotExistError e then return (Nothing, Nothing) else throwIO e + +terminate :: Process -> IO () +terminate (Process pid _ _) = signalProcess sigTERM pid diff --git a/src/Streamly/System/Process.hs b/src/Streamly/System/Process.hs index e022ad3d..92c09eeb 100644 --- a/src/Streamly/System/Process.hs +++ b/src/Streamly/System/Process.hs @@ -134,7 +134,7 @@ import Streamly.Internal.System.Process -- >>> import Data.Char (toUpper) -- >>> import Data.Function ((&)) -- >>> import qualified Streamly.Console.Stdio as Stdio --- >>> import qualified Streamly.Data.Array.Foreign as Array +-- >>> import qualified Streamly.Data.Array.Unboxed as Array -- >>> import qualified Streamly.Data.Fold as Fold -- >>> import qualified Streamly.Prelude as Stream -- >>> import qualified Streamly.System.Process as Process diff --git a/stack.yaml b/stack.yaml index ecfb9263..be790b09 100644 --- a/stack.yaml +++ b/stack.yaml @@ -5,9 +5,9 @@ packages: extra-deps: - unicode-data-0.3.0 - git: https://github.com/composewell/streamly - commit: "1ee11e87ec920df66e6bb1299ab000948df90ae5" + commit: "4bb8b7c950ffeee9d5c9c3ca23c65be93ca34f0b" - git: https://github.com/composewell/streamly - commit: "1ee11e87ec920df66e6bb1299ab000948df90ae5" + commit: "4bb8b7c950ffeee9d5c9c3ca23c65be93ca34f0b" subdirs: - core diff --git a/streamly-process.cabal b/streamly-process.cabal index ca5529df..7c9ae4cf 100644 --- a/streamly-process.cabal +++ b/streamly-process.cabal @@ -48,17 +48,26 @@ flag use-gauge manual: True default: False +flag use-native + description: Do not depend on the process package + manual: True + default: False + common compile-options default-language: Haskell2010 ghc-options: - -Wall - -Wcompat - -Wunrecognised-warning-flags - -Widentities - -Wincomplete-record-updates - -Wincomplete-uni-patterns - -Wredundant-constraints - -Wnoncanonical-monad-instances + -Weverything + -Wno-implicit-prelude + -Wno-missing-deriving-strategies + -Wno-missing-exported-signatures + -Wno-missing-import-lists + -Wno-missing-local-signatures + -Wno-missing-safe-haskell-mode + -Wno-missed-specialisations + -Wno-all-missed-specialisations + -Wno-monomorphism-restriction + -Wno-prepositive-qualified-module + -Wno-unsafe common optimization-options ghc-options: @@ -66,6 +75,8 @@ common optimization-options -fdicts-strict -fspec-constr-recursive=16 -fmax-worker-args=16 + if flag(use-native) + cpp-options: -DUSE_NATIVE library import: compile-options, optimization-options @@ -74,18 +85,20 @@ library Streamly.System.Process Streamly.Internal.System.Process Streamly.Internal.System.Command - if !os(windows) + if flag (use-native) && !os(windows) exposed-modules: Streamly.Internal.System.Process.Posix build-depends: base >= 4.8 && < 5 , exceptions >= 0.8 && < 0.11 - , process >= 1.0 && < 1.7 -- Uses internal APIs , streamly == 0.9.0.* - if !os(windows) - build-depends: - unix >= 2.5 && < 2.8 + if !flag(use-native) + build-depends: process >= 1.0 && < 1.7 + else + if !os(windows) + build-depends: + unix >= 2.5 && < 2.8 ------------------------------------------------------------------------------- -- Benchmarks @@ -106,7 +119,6 @@ benchmark Benchmark.System.Process streamly-process , base >= 4.8 && < 5 , directory >= 1.2.2 && < 1.4 - , process >= 1.0 && < 1.7 -- Uses internal APIs , streamly == 0.9.0.* @@ -136,7 +148,6 @@ test-suite Test.System.Process , directory >= 1.2.2 && < 1.4 , exceptions >= 0.8 && < 0.11 , hspec >= 2.0 && < 3 - , process >= 1.0 && < 1.7 , QuickCheck >= 2.10 && < 2.15 -- Uses internal APIs , streamly == 0.9.0.* diff --git a/test/Streamly/System/Process.hs b/test/Streamly/System/Process.hs index a7440978..01d7d6c5 100644 --- a/test/Streamly/System/Process.hs +++ b/test/Streamly/System/Process.hs @@ -1,6 +1,6 @@ {-# LANGUAGE CPP #-} -module Main where +module Main (main) where import Data.Function ((&)) import Data.List ((\\)) @@ -71,18 +71,12 @@ minBlockCount = 1 maxBlockCount :: Int maxBlockCount = 100 -minNumChar :: Int -minNumChar = 1 - -maxNumChar :: Int -maxNumChar = 100 * 1024 - arrayChunkSize :: Int arrayChunkSize = 100 interpreterFile :: FilePath interpreterArg :: String -#if mingw32_HOST_OS == 1 +#ifdef mingw32_HOST_OS interpreterFile = "cmd.exe" interpreterArg = "/c" #else @@ -91,21 +85,21 @@ interpreterArg = "sh" #endif executableFile :: FilePath -#if mingw32_HOST_OS == 1 +#ifdef mingw32_HOST_OS executableFile = "./test/data/writeTrToError.bat" #else executableFile = "./test/data/writeTrToError.sh" #endif executableFileFail :: FilePath -#if mingw32_HOST_OS == 1 +#ifdef mingw32_HOST_OS executableFileFail = "./test/data/failExec.bat" #else executableFileFail = "./test/data/failExec.sh" #endif executableFilePass :: FilePath -#if mingw32_HOST_OS == 1 +#ifdef mingw32_HOST_OS executableFilePass = "./test/data/passExec.bat" #else executableFilePass = "./test/data/passExec.sh"