diff --git a/.sbtopts b/.sbtopts new file mode 100644 index 00000000..594985c1 --- /dev/null +++ b/.sbtopts @@ -0,0 +1,4 @@ +-J-Xmx4G +-J-XX:MaxMetaspaceSize=1G +-J-Dfile.encoding=UTF-8 +-J-Dsbt.io.implicit.relative.glob.conversion=allow diff --git a/project/BuildHelper.scala b/project/BuildHelper.scala index 5eb111a4..d84af17d 100644 --- a/project/BuildHelper.scala +++ b/project/BuildHelper.scala @@ -80,6 +80,8 @@ object BuildHelper { def stdSettings(prjName: String) = Seq( name := s"$prjName", fork := true, + run / baseDirectory := file("."), + Test / baseDirectory := file("."), crossScalaVersions := Seq(Scala212, Scala213), ThisBuild / scalaVersion := Scala213, scalacOptions := stdOptions ++ extraOptions(scalaVersion.value), diff --git a/zio-process/js/src/main/scala/zio/process/CommandPlatformSpecific.scala b/zio-process/js/src/main/scala/zio/process/CommandPlatformSpecific.scala index 8b8344be..82993926 100644 --- a/zio-process/js/src/main/scala/zio/process/CommandPlatformSpecific.scala +++ b/zio-process/js/src/main/scala/zio/process/CommandPlatformSpecific.scala @@ -17,7 +17,7 @@ package zio.process import FilePlatformSpecific._ import ProcessPlatformSpecific._ -import zio.{ NonEmptyChunk, ZIO } +import zio._ import scala.annotation.nowarn import scala.scalajs.js import js.JSConverters._ @@ -38,7 +38,7 @@ private[process] trait CommandPlatformSpecific { stdinStream <- c.stdin match { case FromStream(stream, _) => stream.toInputStream.map(Some(_)) - case JavaStream(in, _) => ZIO.succeed(Some(in)) + case JavaStream(in, _) => ZIO.some(in) case _ => ZIO.none } promise <- ZIO.attempt { @@ -97,7 +97,11 @@ private[process] trait CommandPlatformSpecific { process.stdin.write(new js.typedarray.Uint8Array(arr.take(read).map(_.toShort).toJSArray)) ) else if (read == -1) - if (process.stdin != null) if (!process.stdin.writableEnded) { process.stdin.end(); () } + if (process.stdin != null && !process.stdin.writableEnded) { + process.stdin.end() + () + } + } write(true) @@ -113,7 +117,10 @@ private[process] trait CommandPlatformSpecific { "end", { () => connectStdin(s) - if (process.stdin != null) if (!process.stdin.writableEnded) process.stdin.end() + + if (process.stdin != null && !process.stdin.writableEnded) + process.stdin.end() + resolve(zioProcess) } ) @@ -122,7 +129,9 @@ private[process] trait CommandPlatformSpecific { process.on("spawn", () => connectStdin(s)) in.on( "end", - () => if (process.stdin != null) if (!process.stdin.writableEnded) process.stdin.end() + () => + if (process.stdin != null && !process.stdin.writableEnded) + process.stdin.end() ) val node = js.Dynamic.global.require("process") process.on("spawn", () => node.nextTick(() => resolve(zioProcess))) diff --git a/zio-process/js/src/main/scala/zio/process/Constructors.scala b/zio-process/js/src/main/scala/zio/process/Constructors.scala index 49f6ad9e..c7ef1a05 100644 --- a/zio-process/js/src/main/scala/zio/process/Constructors.scala +++ b/zio-process/js/src/main/scala/zio/process/Constructors.scala @@ -20,10 +20,13 @@ import zio.stream.ZStream import java.io.OutputStream import zio.Trace + import java.io.IOException import zio.ZIO import zio.Chunk import zio.Scope +import zio.process.ProcessPlatformSpecific.JProcess + import java.io.InputStream private[process] object Constructors { @@ -52,19 +55,14 @@ private[process] object Constructors { val bytes = byteChunk.toArray out.write(bytes) bytesWritten + bytes.length - }.refineOrDie { case e: IOException => - e - } + }.refineToOrDie[IOException] } } } def zsink(outputStream: OutputStream): ZSink[Any, IOException, Byte, Byte, Long] = fromOutputStream(outputStream) - /** - * Creates a stream from a `java.io.InputStream` - */ - def fromInputStream( + def fromProcessInputStream(process: JProcess)( is: => InputStream, chunkSize: => Int = ZStream.DefaultChunkSize )(implicit trace: Trace): ZStream[Any, IOException, Byte] = @@ -73,8 +71,10 @@ private[process] object Constructors { for { bufArray <- ZIO.succeed(Array.ofDim[Byte](chunkSize)) bytesRead <- ZIO - .attemptBlockingCancelable(is.read(bufArray))(ZIO.succeed(is.close())) - .refineToOrDie[java.io.IOException] + .attemptBlockingCancelable(is.read(bufArray))( + ZIO.succeed(is.close()).ensuring(ZIO.attemptBlocking(process.kill()).ignore) + ) + .refineToOrDie[IOException] .asSomeError bytes <- if (bytesRead < 0) ZIO.fail(None) diff --git a/zio-process/js/src/main/scala/zio/process/FilePlatformSpecific.scala b/zio-process/js/src/main/scala/zio/process/FilePlatformSpecific.scala index a1bca67f..651b7d7e 100644 --- a/zio-process/js/src/main/scala/zio/process/FilePlatformSpecific.scala +++ b/zio-process/js/src/main/scala/zio/process/FilePlatformSpecific.scala @@ -6,6 +6,8 @@ private[process] object FilePlatformSpecific { type File = String type Path = String + def fileOf(file: String): File = file + def getAbsolute(file: File): js.Any = { val path = js.Dynamic.global.require("path") val nodejs = js.Dynamic.global.require("process") diff --git a/zio-process/js/src/main/scala/zio/process/ProcessInput.scala b/zio-process/js/src/main/scala/zio/process/ProcessInput.scala index 7df1800b..19f8ecd1 100644 --- a/zio-process/js/src/main/scala/zio/process/ProcessInput.scala +++ b/zio-process/js/src/main/scala/zio/process/ProcessInput.scala @@ -16,7 +16,7 @@ package zio.process import zio.stream.ZStream -import zio.{ Chunk, Queue } +import zio._ import FilePlatformSpecific._ import java.io.ByteArrayInputStream import java.nio.charset.{ Charset, StandardCharsets } @@ -41,7 +41,7 @@ object ProcessInput { ProcessInput.JavaStream( ProcessPlatformSpecific.JSInputStream( fs.createReadStream(FilePlatformSpecific.getAbsolute(file)).asInstanceOf[JS.Readable], - true + pause = true ), flushChunksEagerly = false ) diff --git a/zio-process/js/src/main/scala/zio/process/ProcessPlatformSpecific.scala b/zio-process/js/src/main/scala/zio/process/ProcessPlatformSpecific.scala index 2efba4df..8edd9a07 100644 --- a/zio-process/js/src/main/scala/zio/process/ProcessPlatformSpecific.scala +++ b/zio-process/js/src/main/scala/zio/process/ProcessPlatformSpecific.scala @@ -15,12 +15,11 @@ */ package zio.process -import zio.ZIO +import zio._ import java.io.{ InputStream, OutputStream } import scala.scalajs.js.typedarray.Uint8Array import scala.scalajs.js -import zio.Chunk import js.JSConverters._ private[process] trait ProcessPlatformSpecific extends ProcessInterface { self: Process => @@ -29,7 +28,13 @@ private[process] trait ProcessPlatformSpecific extends ProcessInterface { self: private var killed = false - protected def waitForUnsafe: Int = self.process.exitCode + def waitFor: IO[CommandError, ExitCode] = + ProcessPlatformSpecific.wait(stdoutInternal).map(_ => ExitCode(self.process.exitCode)).refineOrDie { + case CommandThrowable.IOError(e) => e + } + + protected def waitForUnsafe: Int = + self.process.exitCode protected def isAliveUnsafe: Boolean = !killed protected def destroyUnsafe(): Unit = { @@ -139,6 +144,7 @@ private[process] object ProcessPlatformSpecific { } } + override def readAllBytes(): Array[Byte] = { val arr = buffer.toArray buffer = Chunk.empty diff --git a/zio-process/js/src/test/bash/both-streams-test.sh b/zio-process/js/src/test/bash/both-streams-test.sh deleted file mode 100644 index 8d264927..00000000 --- a/zio-process/js/src/test/bash/both-streams-test.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -echoerr() { echo "$@" 1>&2; } - -echo "stdout1" -echoerr "stderr1" - -echo "stdout2" -echoerr "stderr2" \ No newline at end of file diff --git a/zio-process/js/src/test/bash/echo-repeat.sh b/zio-process/js/src/test/bash/echo-repeat.sh deleted file mode 100644 index 19d9a657..00000000 --- a/zio-process/js/src/test/bash/echo-repeat.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -for i in {1..60}; do echo "iteration: $i"; sleep 1; done \ No newline at end of file diff --git a/zio-process/js/src/test/bash/kill-test/sample-child.sh b/zio-process/js/src/test/bash/kill-test/sample-child.sh deleted file mode 100644 index 4670b77b..00000000 --- a/zio-process/js/src/test/bash/kill-test/sample-child.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash -echo $$ -sleep 30 -echo -n "end: " -echo $$ diff --git a/zio-process/js/src/test/bash/kill-test/sample-parent.sh b/zio-process/js/src/test/bash/kill-test/sample-parent.sh deleted file mode 100644 index 6b21bc13..00000000 --- a/zio-process/js/src/test/bash/kill-test/sample-parent.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash -echo $$ -./sample-child.sh & -./sample-child.sh & -sleep 30 -echo -n "end: " -echo $$ \ No newline at end of file diff --git a/zio-process/js/src/test/bash/no-permissions.sh b/zio-process/js/src/test/bash/no-permissions.sh deleted file mode 100644 index b2d6f2c3..00000000 --- a/zio-process/js/src/test/bash/no-permissions.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -echo "this should not run because it doesn't have execute permissions" diff --git a/zio-process/js/src/test/bash/stdin-echo.sh b/zio-process/js/src/test/bash/stdin-echo.sh deleted file mode 100644 index 0de47dae..00000000 --- a/zio-process/js/src/test/bash/stdin-echo.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash - -while read line -do - echo "$line" -done \ No newline at end of file diff --git a/zio-process/js/src/test/scala/zio/process/SpecProperties.scala b/zio-process/js/src/test/scala/zio/process/SpecProperties.scala deleted file mode 100644 index a3359df5..00000000 --- a/zio-process/js/src/test/scala/zio/process/SpecProperties.scala +++ /dev/null @@ -1,9 +0,0 @@ -package zio.process - -trait SpecProperties { - val dir = "zio-process/shared/" - - import FilePlatformSpecific._ - def mkFile(file: String): File = file - -} diff --git a/zio-process/jvm/src/main/scala/zio/process/Constructors.scala b/zio-process/jvm/src/main/scala/zio/process/Constructors.scala index a8fc9fa5..1840161a 100644 --- a/zio-process/jvm/src/main/scala/zio/process/Constructors.scala +++ b/zio-process/jvm/src/main/scala/zio/process/Constructors.scala @@ -15,24 +15,42 @@ */ package zio.process +import zio.process.ProcessPlatformSpecific.JProcess import zio.stream.ZSink import zio.stream.ZStream import java.io.OutputStream import java.io.InputStream -import zio.Trace +import zio._ + import java.io.IOException private[process] object Constructors { def zsink(outputStream: OutputStream) = ZSink.fromOutputStream(outputStream) - /** - * Creates a stream from a `java.io.InputStream` - */ - def fromInputStream( + def fromProcessInputStream(process: JProcess)( is: => InputStream, chunkSize: => Int = ZStream.DefaultChunkSize - )(implicit trace: Trace): ZStream[Any, IOException, Byte] = ZStream.fromInputStream(is, chunkSize) + )(implicit trace: Trace): ZStream[Any, IOException, Byte] = + ZStream.succeed((is, chunkSize)).flatMap { case (is, chunkSize) => + ZStream.repeatZIOChunkOption { + for { + bufArray <- ZIO.succeed(Array.ofDim[Byte](chunkSize)) + bytesRead <- ZIO + .attemptBlockingCancelable(is.read(bufArray))(ZIO.attemptBlocking(process.destroy()).ignore) + .refineToOrDie[IOException] + .asSomeError + bytes <- if (bytesRead < 0) + ZIO.fail(None) + else if (bytesRead == 0) + ZIO.succeed(Chunk.empty) + else if (bytesRead < chunkSize) + ZIO.succeed(Chunk.fromArray(bufArray).take(bytesRead)) + else + ZIO.succeed(Chunk.fromArray(bufArray)) + } yield bytes + } + } } diff --git a/zio-process/jvm/src/main/scala/zio/process/FilePlatformSpecific.scala b/zio-process/jvm/src/main/scala/zio/process/FilePlatformSpecific.scala index 40269003..e6c1968f 100644 --- a/zio-process/jvm/src/main/scala/zio/process/FilePlatformSpecific.scala +++ b/zio-process/jvm/src/main/scala/zio/process/FilePlatformSpecific.scala @@ -1,7 +1,9 @@ package zio.process -object FilePlatformSpecific { +private[process] object FilePlatformSpecific { type File = java.io.File - def exists(file: File) = file.exists() + def fileOf(file: String): File = new File(file) + + def exists(file: File): Boolean = file.exists() } diff --git a/zio-process/jvm/src/main/scala/zio/process/ProcessInput.scala b/zio-process/jvm/src/main/scala/zio/process/ProcessInput.scala index 9ac4cdd0..5f6ecc6a 100644 --- a/zio-process/jvm/src/main/scala/zio/process/ProcessInput.scala +++ b/zio-process/jvm/src/main/scala/zio/process/ProcessInput.scala @@ -16,7 +16,7 @@ package zio.process import zio.stream.ZStream -import zio.{ Chunk, Queue } +import zio._ import java.io.ByteArrayInputStream import java.nio.charset.{ Charset, StandardCharsets } diff --git a/zio-process/jvm/src/main/scala/zio/process/ProcessPlatformSpecific.scala b/zio-process/jvm/src/main/scala/zio/process/ProcessPlatformSpecific.scala index fab230dd..290c29b8 100644 --- a/zio-process/jvm/src/main/scala/zio/process/ProcessPlatformSpecific.scala +++ b/zio-process/jvm/src/main/scala/zio/process/ProcessPlatformSpecific.scala @@ -18,17 +18,27 @@ package zio.process import java.io.InputStream import java.io.OutputStream import scala.jdk.CollectionConverters._ -import zio.ZIO +import zio._ + import scala.annotation.nowarn private[process] trait ProcessPlatformSpecific { self: Process => import ProcessPlatformSpecific._ - protected def waitForUnsafe: Int = self.process.waitFor() + protected def waitFor: IO[CommandError, ExitCode] = + ZIO + .fromCompletableFuture(self.process.onExit()) + .map(x => ExitCode(x.exitValue())) + .onInterrupt(ZIO.attemptBlocking(self.destroyUnsafe()).ignore) + .refineOrDie { case CommandThrowable.IOError(e) => e } + + protected def waitForUnsafe: Int = self.process.waitFor() + + protected def isAliveUnsafe: Boolean = self.process.isAlive() + + protected def destroyUnsafe(): Unit = self.process.destroy() - protected def isAliveUnsafe: Boolean = self.process.isAlive() - protected def destroyUnsafe(): Unit = self.process.destroy() protected def destroyForciblyUnsafe: JProcess = self.process.destroyForcibly() protected def pidUnsafe: Long = self.process.pid @@ -49,23 +59,26 @@ private[process] trait ProcessPlatformSpecific { self: Process => * Note: This method requires JDK 9+ */ def killTree: ZIO[Any, CommandError, Unit] = - self.execute { process => - val d = process.descendants().toList().asScala - d.foreach { p => - destroyHandle(p) - () - } - - destroyUnsafe() - waitForUnsafe - - d.foreach { p => - if (isAliveHandle(p)) { - onExitHandle(p).get // `ProcessHandle` doesn't have waitFor - () - } - } - } + for { + descendants <- ZIO.succeed(process.descendants().toList.asScala) + _ <- self.execute { _ => + descendants.foreach { p => + destroyHandle(p) + () + } + + destroyUnsafe() + } + _ <- waitFor + _ <- self.execute { _ => + descendants.foreach { p => + if (isAliveHandle(p)) { + onExitHandle(p).get // `ProcessHandle` doesn't have waitFor + () + } + } + } + } yield () /** * Kills the entire process tree and will wait until completed. Equivalent to SIGKILL on Unix platforms. @@ -73,23 +86,26 @@ private[process] trait ProcessPlatformSpecific { self: Process => * Note: This method requires JDK 9+ */ def killTreeForcibly: ZIO[Any, CommandError, Unit] = - self.execute { process => - val d = process.descendants().toList().asScala - d.foreach { p => - destroyForciblyHandle(p) - () - } - - destroyForciblyUnsafe - waitForUnsafe - - d.foreach { p => - if (isAliveHandle(p)) { - onExitHandle(p).get // `ProcessHandle` doesn't have waitFor - () - } - } - } + for { + descendants <- ZIO.succeed(process.descendants().toList.asScala) + _ <- self.execute { _ => + descendants.foreach { p => + destroyForciblyHandle(p) + () + } + + destroyForciblyUnsafe + } + _ <- waitFor + _ <- self.execute { _ => + descendants.foreach { p => + if (isAliveHandle(p)) { + onExitHandle(p).get // `ProcessHandle` doesn't have waitFor + () + } + } + } + } yield () } private[process] object ProcessPlatformSpecific { diff --git a/zio-process/jvm/src/test/bash/both-streams-test.sh b/zio-process/jvm/src/test/bash/both-streams-test.sh deleted file mode 100755 index 8d264927..00000000 --- a/zio-process/jvm/src/test/bash/both-streams-test.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/bash - -echoerr() { echo "$@" 1>&2; } - -echo "stdout1" -echoerr "stderr1" - -echo "stdout2" -echoerr "stderr2" \ No newline at end of file diff --git a/zio-process/jvm/src/test/bash/echo-repeat.sh b/zio-process/jvm/src/test/bash/echo-repeat.sh deleted file mode 100755 index 19d9a657..00000000 --- a/zio-process/jvm/src/test/bash/echo-repeat.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -for i in {1..60}; do echo "iteration: $i"; sleep 1; done \ No newline at end of file diff --git a/zio-process/jvm/src/test/bash/kill-test/sample-child.sh b/zio-process/jvm/src/test/bash/kill-test/sample-child.sh deleted file mode 100755 index 4670b77b..00000000 --- a/zio-process/jvm/src/test/bash/kill-test/sample-child.sh +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash -echo $$ -sleep 30 -echo -n "end: " -echo $$ diff --git a/zio-process/jvm/src/test/bash/kill-test/sample-parent.sh b/zio-process/jvm/src/test/bash/kill-test/sample-parent.sh deleted file mode 100755 index 6b21bc13..00000000 --- a/zio-process/jvm/src/test/bash/kill-test/sample-parent.sh +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/bash -echo $$ -./sample-child.sh & -./sample-child.sh & -sleep 30 -echo -n "end: " -echo $$ \ No newline at end of file diff --git a/zio-process/jvm/src/test/bash/no-permissions.sh b/zio-process/jvm/src/test/bash/no-permissions.sh deleted file mode 100644 index b2d6f2c3..00000000 --- a/zio-process/jvm/src/test/bash/no-permissions.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -echo "this should not run because it doesn't have execute permissions" diff --git a/zio-process/jvm/src/test/bash/stdin-echo.sh b/zio-process/jvm/src/test/bash/stdin-echo.sh deleted file mode 100755 index 0de47dae..00000000 --- a/zio-process/jvm/src/test/bash/stdin-echo.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash - -while read line -do - echo "$line" -done \ No newline at end of file diff --git a/zio-process/jvm/src/test/scala/zio/process/CommandPlatformSpecificSpec.scala b/zio-process/jvm/src/test/scala/zio/process/CommandPlatformSpecificSpec.scala index e71ca6c8..fcb25f21 100644 --- a/zio-process/jvm/src/test/scala/zio/process/CommandPlatformSpecificSpec.scala +++ b/zio-process/jvm/src/test/scala/zio/process/CommandPlatformSpecificSpec.scala @@ -13,7 +13,8 @@ object CommandPlatformSpecificSpec extends ZIOProcessBaseSpec { def spec = suite("CommandSpec")( test("killTree also kills child processes") { for { - process <- Command("./sample-parent.sh").workingDirectory(new File("src/test/bash/kill-test")).run + process <- + Command("./sample-parent.sh").workingDirectory(fileOf("zio-process/shared/src/test/bash/kill-test")).run pids <- process.stdout.stream .via(ZPipeline.utf8Decode) .via(ZPipeline.splitLines) @@ -28,7 +29,8 @@ object CommandPlatformSpecificSpec extends ZIOProcessBaseSpec { } @@ TestAspect.nonFlaky(25), test("killTreeForcibly also kills child processes") { for { - process <- Command("./sample-parent.sh").workingDirectory(new File("src/test/bash/kill-test")).run + process <- + Command("./sample-parent.sh").workingDirectory(fileOf("zio-process/shared/src/test/bash/kill-test")).run pids <- process.stdout.stream .via(ZPipeline.utf8Decode) .via(ZPipeline.splitLines) @@ -43,7 +45,8 @@ object CommandPlatformSpecificSpec extends ZIOProcessBaseSpec { } @@ TestAspect.nonFlaky(25), test("kill only kills parent process") { for { - process <- Command("./sample-parent.sh").workingDirectory(new File("src/test/bash/kill-test")).run + process <- + Command("./sample-parent.sh").workingDirectory(fileOf("zio-process/shared/src/test/bash/kill-test")).run pids <- process.stdout.stream .via(ZPipeline.utf8Decode) .via(ZPipeline.splitLines) diff --git a/zio-process/jvm/src/test/scala/zio/process/SpecProperties.scala b/zio-process/jvm/src/test/scala/zio/process/SpecProperties.scala deleted file mode 100644 index 60b9748a..00000000 --- a/zio-process/jvm/src/test/scala/zio/process/SpecProperties.scala +++ /dev/null @@ -1,8 +0,0 @@ -package zio.process - -trait SpecProperties { - val dir = "" - - import FilePlatformSpecific._ - def mkFile(file: String): File = new File(file) -} diff --git a/zio-process/native/src/main/scala/zio/process/CommandPlatformSpecific.scala b/zio-process/native/src/main/scala/zio/process/CommandPlatformSpecific.scala index 8df1134e..129a8963 100644 --- a/zio-process/native/src/main/scala/zio/process/CommandPlatformSpecific.scala +++ b/zio-process/native/src/main/scala/zio/process/CommandPlatformSpecific.scala @@ -16,8 +16,7 @@ package zio.process import scala.annotation.nowarn -import zio.NonEmptyChunk -import zio.{ Chunk, ZIO } +import zio._ import FilePlatformSpecific._ import java.lang.ProcessBuilder.Redirect import scala.jdk.CollectionConverters._ diff --git a/zio-process/native/src/main/scala/zio/process/Constructors.scala b/zio-process/native/src/main/scala/zio/process/Constructors.scala index dc3626e6..01e09fe2 100644 --- a/zio-process/native/src/main/scala/zio/process/Constructors.scala +++ b/zio-process/native/src/main/scala/zio/process/Constructors.scala @@ -20,10 +20,13 @@ import zio.stream.ZStream import java.io.OutputStream import zio.Trace + import java.io.IOException import zio.ZIO import zio.Chunk import zio.Scope +import zio.process.ProcessPlatformSpecific.JProcess + import java.io.InputStream private[process] object Constructors { @@ -52,21 +55,35 @@ private[process] object Constructors { val bytes = byteChunk.toArray out.write(bytes) bytesWritten + bytes.length - }.refineOrDie { case e: IOException => - e - } + }.refineToOrDie[IOException] } } } def zsink(outputStream: OutputStream): ZSink[Any, IOException, Byte, Byte, Long] = fromOutputStream(outputStream) - /** - * Creates a stream from a `java.io.InputStream` - */ - def fromInputStream( + def fromProcessInputStream(process: JProcess)( is: => InputStream, chunkSize: => Int = ZStream.DefaultChunkSize - )(implicit trace: Trace): ZStream[Any, IOException, Byte] = ZStream.fromInputStream(is, chunkSize) + )(implicit trace: Trace): ZStream[Any, IOException, Byte] = + ZStream.succeed((is, chunkSize)).flatMap { case (is, chunkSize) => + ZStream.repeatZIOChunkOption { + for { + bufArray <- ZIO.succeed(Array.ofDim[Byte](chunkSize)) + bytesRead <- ZIO + .attemptBlockingCancelable(is.read(bufArray))(ZIO.attemptBlocking(process.destroy()).ignore) + .refineToOrDie[IOException] + .asSomeError + bytes <- if (bytesRead < 0) + ZIO.fail(None) + else if (bytesRead == 0) + ZIO.succeed(Chunk.empty) + else if (bytesRead < chunkSize) + ZIO.succeed(Chunk.fromArray(bufArray).take(bytesRead)) + else + ZIO.succeed(Chunk.fromArray(bufArray)) + } yield bytes + } + } } diff --git a/zio-process/native/src/main/scala/zio/process/FilePlatformSpecific.scala b/zio-process/native/src/main/scala/zio/process/FilePlatformSpecific.scala index 0497cedb..e6c1968f 100644 --- a/zio-process/native/src/main/scala/zio/process/FilePlatformSpecific.scala +++ b/zio-process/native/src/main/scala/zio/process/FilePlatformSpecific.scala @@ -1,8 +1,9 @@ package zio.process -object FilePlatformSpecific { +private[process] object FilePlatformSpecific { type File = java.io.File - def exists(file: File) = file.exists() + def fileOf(file: String): File = new File(file) + def exists(file: File): Boolean = file.exists() } diff --git a/zio-process/native/src/main/scala/zio/process/ProcessInput.scala b/zio-process/native/src/main/scala/zio/process/ProcessInput.scala index e04f69e0..52ac42cf 100644 --- a/zio-process/native/src/main/scala/zio/process/ProcessInput.scala +++ b/zio-process/native/src/main/scala/zio/process/ProcessInput.scala @@ -16,7 +16,7 @@ package zio.process import zio.stream.ZStream -import zio.{ Chunk, Queue } +import zio._ import java.io.ByteArrayInputStream import java.nio.charset.{ Charset, StandardCharsets } diff --git a/zio-process/native/src/main/scala/zio/process/ProcessPlatformSpecific.scala b/zio-process/native/src/main/scala/zio/process/ProcessPlatformSpecific.scala index 04edb62c..93eaeaa9 100644 --- a/zio-process/native/src/main/scala/zio/process/ProcessPlatformSpecific.scala +++ b/zio-process/native/src/main/scala/zio/process/ProcessPlatformSpecific.scala @@ -17,7 +17,7 @@ package zio.process import java.io.InputStream import java.io.OutputStream -import zio.ZIO +import zio._ import java.io.PushbackInputStream import scala.annotation.nowarn @@ -25,6 +25,16 @@ private[process] trait ProcessPlatformSpecific { self: Process => import ProcessPlatformSpecific._ + protected def waitFor: IO[CommandError, ExitCode] = + ZIO + .attemptBlockingCancelable(waitForUnsafe)( + ZIO.attemptBlocking(self.destroyUnsafe()).ignore + ) + .map(x => ExitCode(x)) + .refineOrDie { case CommandThrowable.IOError(e) => + e + } + protected def waitForUnsafe: Int = self.process.waitFor() protected def isAliveUnsafe: Boolean = self.process.isAlive() diff --git a/zio-process/native/src/test/scala/zio/process/SpecProperties.scala b/zio-process/native/src/test/scala/zio/process/SpecProperties.scala deleted file mode 100644 index 90674eec..00000000 --- a/zio-process/native/src/test/scala/zio/process/SpecProperties.scala +++ /dev/null @@ -1,8 +0,0 @@ -package zio.process - -trait SpecProperties { - val dir = "zio-process/shared/" - - import FilePlatformSpecific._ - def mkFile(file: String): File = new File(file) -} diff --git a/zio-process/shared/src/main/scala/zio/process/Command.scala b/zio-process/shared/src/main/scala/zio/process/Command.scala index b915cdd9..a514ec74 100644 --- a/zio-process/shared/src/main/scala/zio/process/Command.scala +++ b/zio-process/shared/src/main/scala/zio/process/Command.scala @@ -110,7 +110,6 @@ sealed trait Command extends CommandPlatformSpecific { } process <- build(c, piping).mapError(CommandThrowable.classify) _ <- connectStdin(process, c.stdin) - } yield process case c: Command.Piped => diff --git a/zio-process/shared/src/main/scala/zio/process/Process.scala b/zio-process/shared/src/main/scala/zio/process/Process.scala index 8a385f74..c73372fa 100644 --- a/zio-process/shared/src/main/scala/zio/process/Process.scala +++ b/zio-process/shared/src/main/scala/zio/process/Process.scala @@ -15,8 +15,7 @@ */ package zio.process -import zio.ZIO.{ attemptBlockingCancelable, attemptBlockingInterrupt } -import zio.{ ExitCode, UIO, ZIO } +import zio._ import java.io.InputStream import ProcessPlatformSpecific._ @@ -26,12 +25,12 @@ final case class Process(private[process] val process: JProcess) extends Process /** * Access the standard output stream. */ - val stdout: ProcessStream = ProcessStream(getInputStream, get) + val stdout: ProcessStream = ProcessStream(process, getInputStream, get) /** * Access the standard error stream. */ - val stderr: ProcessStream = ProcessStream(getErrorStream, get) + val stderr: ProcessStream = ProcessStream(process, getErrorStream, get) /** * Access the standard output as an Java `InputStream`. @@ -41,16 +40,14 @@ final case class Process(private[process] val process: JProcess) extends Process /** * Access the underlying Java Process wrapped in a blocking ZIO. */ - def execute[T](f: JProcess => T): ZIO[Any, CommandError, T] = - attemptBlockingInterrupt(f(process)).refineOrDie { case CommandThrowable.IOError(e) => e } + def execute[T](f: JProcess => T): IO[CommandError, T] = + ZIO.attemptBlockingInterrupt(f(process)).refineOrDie { case CommandThrowable.IOError(e) => e } /** * Return the exit code of this process. */ - def exitCode: ZIO[Any, CommandError, ExitCode] = - attemptBlockingCancelable(ExitCode(waitForUnsafe))(ZIO.succeed(destroyUnsafe())).refineOrDie { - case CommandThrowable.IOError(e) => e - } + def exitCode: IO[CommandError, ExitCode] = + waitFor /** * Tests whether the process is still alive (not terminated or completed). @@ -62,36 +59,32 @@ final case class Process(private[process] val process: JProcess) extends Process * * This method requires JDK +9. */ - def pid: ZIO[Any, CommandError, Long] = attemptBlockingInterrupt(pidUnsafe).refineOrDie { + def pid: IO[CommandError, Long] = ZIO.attemptBlockingInterrupt(pidUnsafe).refineOrDie { case CommandThrowable.IOError(e) => e } /** * Kills the process and will wait until completed. Equivalent to SIGTERM on Unix platforms. */ - def kill: ZIO[Any, CommandError, Unit] = - attemptBlockingInterrupt { - destroyUnsafe() - waitForUnsafe - () - }.refineOrDie { case CommandThrowable.IOError(e) => e } + def kill: IO[CommandError, Unit] = + (for { + _ <- ZIO.attemptBlockingInterrupt(destroyUnsafe()) + _ <- waitFor + } yield ()).refineOrDie { case CommandThrowable.IOError(e) => e } /** * Kills the process and will wait until completed. Equivalent to SIGKILL on Unix platforms. */ - def killForcibly: ZIO[Any, CommandError, Unit] = - attemptBlockingInterrupt { - destroyForciblyUnsafe - waitForUnsafe - () - }.refineOrDie { case CommandThrowable.IOError(e) => e } + def killForcibly: IO[CommandError, Unit] = + (for { + _ <- ZIO.attemptBlockingInterrupt(destroyForciblyUnsafe) + _ <- waitFor + } yield ()).refineOrDie { case CommandThrowable.IOError(e) => e } /** * Return the exit code of this process if it is zero. If non-zero, it will fail with `CommandError.NonZeroErrorCode`. */ - def successfulExitCode: ZIO[Any, CommandError, ExitCode] = - attemptBlockingCancelable(ExitCode(waitForUnsafe))(ZIO.succeed(destroyUnsafe())).refineOrDie { - case CommandThrowable.IOError(e) => e: CommandError - }.filterOrElseWith(_ == ExitCode.success)(exitCode => ZIO.fail(CommandError.NonZeroErrorCode(exitCode))) + def successfulExitCode: IO[CommandError, ExitCode] = + waitFor.filterOrElseWith(_ == ExitCode.success)(exitCode => ZIO.fail(CommandError.NonZeroErrorCode(exitCode))) } diff --git a/zio-process/shared/src/main/scala/zio/process/ProcessStream.scala b/zio-process/shared/src/main/scala/zio/process/ProcessStream.scala index 0edc7095..f62798df 100644 --- a/zio-process/shared/src/main/scala/zio/process/ProcessStream.scala +++ b/zio-process/shared/src/main/scala/zio/process/ProcessStream.scala @@ -16,14 +16,16 @@ package zio.process import zio.ZIO.attemptBlockingCancelable +import zio.process.ProcessPlatformSpecific.JProcess import zio.stream.{ ZPipeline, ZStream } -import zio.{ Chunk, ZIO } +import zio._ import java.io._ import java.nio.charset.{ Charset, StandardCharsets } import scala.collection.mutable.ArrayBuffer final case class ProcessStream( + private[process] val process: JProcess, private[process] val inputStream: InputStream, private[process] val outputStream: Option[OutputStream] = None ) { @@ -79,7 +81,7 @@ final case class ProcessStream( */ def stream: ZStream[Any, CommandError, Byte] = Constructors - .fromInputStream(inputStream) + .fromProcessInputStream(process)(inputStream) .ensuring(ZIO.succeed(inputStream.close())) .mapError(CommandError.IOError.apply) diff --git a/zio-process/shared/src/test/bash/echo-repeat-short.sh b/zio-process/shared/src/test/bash/echo-repeat-short.sh new file mode 100755 index 00000000..cf848441 --- /dev/null +++ b/zio-process/shared/src/test/bash/echo-repeat-short.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +for i in {1..3}; do echo "iteration: $i"; sleep 1; done \ No newline at end of file diff --git a/zio-process/shared/src/test/bash/kill-test/sample-child.sh b/zio-process/shared/src/test/bash/kill-test/sample-child.sh old mode 100644 new mode 100755 diff --git a/zio-process/shared/src/test/bash/kill-test/sample-parent.sh b/zio-process/shared/src/test/bash/kill-test/sample-parent.sh old mode 100644 new mode 100755 diff --git a/zio-process/shared/src/test/bash/stdin-echo.sh b/zio-process/shared/src/test/bash/stdin-echo.sh old mode 100644 new mode 100755 diff --git a/zio-process/shared/src/test/scala/zio/process/CommandSpec.scala b/zio-process/shared/src/test/scala/zio/process/CommandSpec.scala index 779067ce..20dce46d 100644 --- a/zio-process/shared/src/test/scala/zio/process/CommandSpec.scala +++ b/zio-process/shared/src/test/scala/zio/process/CommandSpec.scala @@ -3,12 +3,13 @@ package zio.process import zio.stream.ZPipeline import zio.test.Assertion._ import zio.test._ -import zio.{ durationInt, Chunk, ExitCode, Queue, System, ZIO } +import zio._ +import zio.process.FilePlatformSpecific.fileOf import java.nio.charset.StandardCharsets // TODO: Add aspects for different OSes? scala.util.Properties.isWin, etc. Also try to make this as OS agnostic as possible in the first place -object CommandSpec extends ZIOProcessBaseSpec with SpecProperties { +object CommandSpec extends ZIOProcessBaseSpec { def spec = suite("CommandSpec")( test("convert stdout to string") { @@ -45,14 +46,15 @@ object CommandSpec extends ZIOProcessBaseSpec with SpecProperties { assertZIO(zio)(equalTo("a b c")) } @@ TestAspect.jvmOnly, - test("accept string stdin") { // + test("accept string stdin") { val zio = Command("cat").stdin(ProcessInput.fromUTF8String("piped in")).string assertZIO(zio)(equalTo("piped in")) }, test("accept file stdin") { for { - lines <- Command("cat").stdin(ProcessInput.fromFile(mkFile(s"${dir}src/test/bash/echo-repeat.sh"))).lines + lines <- + Command("cat").stdin(ProcessInput.fromFile(fileOf(s"zio-process/shared/src/test/bash/echo-repeat.sh"))).lines } yield assertTrue(lines.head == "#!/bin/bash") }, test("support different encodings") { @@ -64,12 +66,12 @@ object CommandSpec extends ZIOProcessBaseSpec with SpecProperties { assertZIO(zio)(equalTo("piped in")) }, test("set workingDirectory") { - val zio = Command("ls").workingDirectory(mkFile(s"${dir}src/test/bash")).lines + val zio = Command("ls").workingDirectory(fileOf(s"zio-process/shared/src/test/bash")).lines assertZIO(zio)(contains("no-permissions.sh")) }, test("be able to fallback to a different program using typed error channel") { - val zio = Command("echo", "-n", "wrong").workingDirectory(mkFile("no-folder")).string.catchSome { + val zio = Command("echo", "-n", "wrong").workingDirectory(fileOf("no-folder")).string.catchSome { case CommandError.WorkingDirectoryMissing(_) => Command("echo", "-n", "test").string } @@ -85,25 +87,23 @@ object CommandSpec extends ZIOProcessBaseSpec with SpecProperties { assertZIO(zio.exit)(isInterrupted) }, - test("interrupt a process due to timeout") { - val zio = for { - fiber <- Command("sleep", "20").exitCode.timeout(5.seconds).fork - adjustFiber <- TestClock.adjust(5.seconds).fork - _ <- ZIO.sleep(5.seconds) - _ <- adjustFiber.join - result <- fiber.join - } yield result - - assertZIO(zio)(isNone) - } @@ TestAspect.ignore, // TODO: Until https://github.com/zio/zio/issues/3840 is fixed or there is a workaround test("capture stdout and stderr separately") { val zio = for { - process <- Command(s"${dir}src/test/bash/both-streams-test.sh").run + process <- Command(s"zio-process/shared/src/test/bash/both-streams-test.sh").run stdout <- process.stdout.string stderr <- process.stderr.string } yield (stdout, stderr) assertZIO(zio)(equalTo(("stdout1\nstdout2\n", "stderr1\nstderr2\n"))) + } @@ TestAspect.withLiveClock, + test("streaming entire result from stdout should wait for completion") { + val zio = for { + process <- Command(s"zio-process/shared/src/test/bash/echo-repeat-short.sh").run + _ <- ZIO.sleep(5.seconds) // TODO: This should not be needed. If you remove the sleep it fails. + stdout <- process.stdout.string + } yield stdout + + assertZIO(zio)(equalTo("iteration: 1\niteration: 2\niteration: 3\n")) }, test("return non-zero exit code in success channel") { val zio = Command("ls", "--non-existent-flag").exitCode @@ -116,20 +116,20 @@ object CommandSpec extends ZIOProcessBaseSpec with SpecProperties { assertZIO(zio.exit)(fails(isSubtype[CommandError.NonZeroErrorCode](anything))) }, test("permission denied is a typed error") { - val zio = Command(s"${dir}src/test/bash/no-permissions.sh").string + val zio = Command(s"zio-process/shared/src/test/bash/no-permissions.sh").string assertZIO(zio.exit)(fails(isSubtype[CommandError.PermissionDenied](anything))) } @@ TestAspect.exceptNative, test("redirectErrorStream should merge stderr into stdout") { for { - process <- Command(s"${dir}src/test/bash/both-streams-test.sh").redirectErrorStream(true).run + process <- Command(s"zio-process/shared/src/test/bash/both-streams-test.sh").redirectErrorStream(true).run stdout <- process.stdout.string stderr <- process.stderr.string } yield assertTrue(stdout == "stdout1\nstderr1\nstdout2\nstderr2\n", stderr.isEmpty) } @@ TestAspect.exceptJS, test("be able to kill a process that's running") { for { - process <- Command(s"${dir}src/test/bash/echo-repeat.sh").run + process <- Command(s"zio-process/shared/src/test/bash/echo-repeat.sh").run isAliveBeforeKill <- process.isAlive _ <- process.kill isAliveAfterKill <- process.isAlive @@ -137,7 +137,7 @@ object CommandSpec extends ZIOProcessBaseSpec with SpecProperties { }, test("typed error for non-existent working directory") { for { - exit <- Command("ls").workingDirectory(mkFile("/some/bad/path")).lines.exit + exit <- Command("ls").workingDirectory(fileOf("/some/bad/path")).lines.exit } yield assert(exit)(fails(isSubtype[CommandError.WorkingDirectoryMissing](anything))) }, test("end of stream also closes underlying process") { @@ -154,7 +154,7 @@ object CommandSpec extends ZIOProcessBaseSpec with SpecProperties { for { commandQueue <- Queue.unbounded[Chunk[Byte]] process <- Command("./stdin-echo.sh") - .workingDirectory(mkFile(s"${dir}src/test/bash")) + .workingDirectory(fileOf(s"zio-process/shared/src/test/bash")) .stdin(ProcessInput.fromQueue(commandQueue)) .run _ <- commandQueue.offer(Chunk.fromArray("line1\nline2\n".getBytes(StandardCharsets.UTF_8))) @@ -177,13 +177,40 @@ object CommandSpec extends ZIOProcessBaseSpec with SpecProperties { _ <- commandQueue.offer(Chunk.fromArray(s"process.exit(0)${sep}".getBytes(StandardCharsets.UTF_8))) _ <- fiber.join } yield assertCompletes - } @@ TestAspect.withLiveClock @@ TestAspect.exceptJS, + } @@ TestAspect.exceptJS, test("get pid of a running process") { for { process <- Command("ls").run pid <- process.pid } yield assertTrue(pid > 0L) - } - ) + }, + suite("interruption")( + test("interrupt a process due to timeout (exitCode)") { + for { + result <- Command("sleep", "60").exitCode.timeout(3.seconds) + } yield assertTrue(result.isEmpty) + }, + test("interrupt a process due to timeout (stream)") { + val stream = Command("sleep", "60").stream + + for { + result <- stream.runDrain.timeout(3.seconds) + } yield assertTrue(result.isEmpty) + }, + test("interrupt a process due to timeout (linesStream)") { + val stream = Command("sleep", "60").linesStream + + for { + result <- stream.runDrain.timeout(3.seconds) + } yield assertTrue(result.isEmpty) + }, + test("interrupt a process due to timeout (stdout stream)") { + for { + process <- Command("sleep", "60").run + result <- process.stdout.stream.runDrain.timeout(3.seconds) + } yield assertTrue(result.isEmpty) + } + ) @@ TestAspect.exceptNative @@ TestAspect.timeout(30.seconds) + ) @@ TestAspect.withLiveClock } diff --git a/zio-process/shared/src/test/scala/zio/process/ZIOProcessBaseSpec.scala b/zio-process/shared/src/test/scala/zio/process/ZIOProcessBaseSpec.scala index d68f69e3..1ef029e7 100644 --- a/zio-process/shared/src/test/scala/zio/process/ZIOProcessBaseSpec.scala +++ b/zio-process/shared/src/test/scala/zio/process/ZIOProcessBaseSpec.scala @@ -1,7 +1,7 @@ package zio.process import zio.test._ -import zio.{ durationInt, Chunk } +import zio._ trait ZIOProcessBaseSpec extends ZIOSpecDefault { override def aspects = Chunk(TestAspect.timeout(30.seconds))