diff --git a/build.sbt b/build.sbt index b5b6560d..a923aeda 100644 --- a/build.sbt +++ b/build.sbt @@ -27,7 +27,7 @@ ThisBuild / publishTo := sonatypePublishToBundle.value addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt") addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck") -val zioVersion = "1.0.0-RC18-2" +val zioVersion = "1.0.0-RC19" libraryDependencies ++= Seq( "dev.zio" %% "zio" % zioVersion, "dev.zio" %% "zio-streams" % zioVersion, diff --git a/docs/overview/basics.md b/docs/overview/basics.md index 0ceef64f..313441ed 100644 --- a/docs/overview/basics.md +++ b/docs/overview/basics.md @@ -59,7 +59,7 @@ When you don't care about the output (or there is no output), you can return jus command.exitCode ``` -### Stream of bytes (chunked) +### Stream of bytes If you need lower-level access to the output's stream of bytes, you can access them directly like so: @@ -67,8 +67,6 @@ If you need lower-level access to the output's stream of bytes, you can access t command.stream ``` -The bytes are chunked for performance in the form of `StreamChunk[Throwable, Byte]` - ### Access stdout and stderr separately There are times where you need to process the output of stderr as well. diff --git a/docs/overview/piping.md b/docs/overview/piping.md index 2c08d277..4e583ee8 100644 --- a/docs/overview/piping.md +++ b/docs/overview/piping.md @@ -15,8 +15,8 @@ import zio.process._ ```scala mdoc:silent for { processes <- Command("ps", "-ef").stream - javaProcesses <- Command("grep", "java").stdin(ProcessInput.fromStreamChunk(processes)).stream - processIds <- Command("awk", "{print $2}").stdin(ProcessInput.fromStreamChunk(javaProcesses)).lines + javaProcesses <- Command("grep", "java").stdin(ProcessInput.fromStream(processes)).stream + processIds <- Command("awk", "{print $2}").stdin(ProcessInput.fromStream(javaProcesses)).lines } yield processIds ``` diff --git a/src/main/scala/zio/process/Command.scala b/src/main/scala/zio/process/Command.scala index c60e9caf..b38fbe69 100644 --- a/src/main/scala/zio/process/Command.scala +++ b/src/main/scala/zio/process/Command.scala @@ -20,7 +20,7 @@ import java.lang.ProcessBuilder.Redirect import java.nio.charset.Charset import zio.blocking.Blocking -import zio.stream.{ StreamChunk, ZSink, ZStream } +import zio.stream.{ ZSink, ZStream } import zio.{ IO, RIO, Task, UIO, ZIO } import scala.jdk.CollectionConverters._ @@ -137,7 +137,7 @@ sealed trait Command { case ProcessInput(Some(input)) => for { outputStream <- process.execute(_.getOutputStream) - _ <- input.chunks + _ <- input .run(ZSink.fromOutputStream(outputStream)) .ensuring(UIO(outputStream.close())) .forkDaemon @@ -155,9 +155,9 @@ sealed trait Command { for { stream <- tail.init.foldLeft(head.stream) { case (s, command) => - s.flatMap(input => command.stdin(ProcessInput.fromStreamChunk(input)).stream) + s.flatMap(input => command.stdin(ProcessInput.fromStream(input)).stream) } - result <- tail.last.stdin(ProcessInput.fromStreamChunk(stream)).run + result <- tail.last.stdin(ProcessInput.fromStream(stream)).run } yield result } } @@ -203,7 +203,7 @@ sealed trait Command { /** * Runs the command returning the output as a chunked stream of bytes. */ - def stream: RIO[Blocking, StreamChunk[Throwable, Byte]] = + def stream: RIO[Blocking, ZStream[Blocking, Throwable, Byte]] = run.map(_.stdout.stream) /** diff --git a/src/main/scala/zio/process/ProcessInput.scala b/src/main/scala/zio/process/ProcessInput.scala index 6f1fead2..425a8466 100644 --- a/src/main/scala/zio/process/ProcessInput.scala +++ b/src/main/scala/zio/process/ProcessInput.scala @@ -19,9 +19,10 @@ import java.io.ByteArrayInputStream import java.nio.charset.{ Charset, StandardCharsets } import zio.Chunk -import zio.stream.{ Stream, StreamChunk } +import zio.blocking.Blocking +import zio.stream.{ Stream, ZStream } -final case class ProcessInput(source: Option[StreamChunk[Throwable, Byte]]) +final case class ProcessInput(source: Option[ZStream[Blocking, Throwable, Byte]]) object ProcessInput { val inherit: ProcessInput = ProcessInput(None) @@ -33,20 +34,20 @@ object ProcessInput { ProcessInput(Some(Stream.fromInputStream(new ByteArrayInputStream(bytes)))) /** - * Returns a ProcessInput from a stream of chunked bytes. + * Returns a ProcessInput from a stream of bytes. */ - def fromStreamChunk(stream: StreamChunk[Throwable, Byte]): ProcessInput = + def fromStream(stream: ZStream[Blocking, Throwable, Byte]): ProcessInput = ProcessInput(Some(stream)) /** * Returns a ProcessInput from a String with the given charset. */ def fromString(text: String, charset: Charset): ProcessInput = - ProcessInput(Some(StreamChunk.fromChunks(Chunk.fromArray(text.getBytes(charset))))) + ProcessInput(Some(ZStream.fromChunks(Chunk.fromArray(text.getBytes(charset))))) /** * Returns a ProcessInput from a UTF-8 String. */ def fromUTF8String(text: String): ProcessInput = - ProcessInput(Some(StreamChunk.fromChunks(Chunk.fromArray(text.getBytes(StandardCharsets.UTF_8))))) + ProcessInput(Some(ZStream.fromChunks(Chunk.fromArray(text.getBytes(StandardCharsets.UTF_8))))) } diff --git a/src/main/scala/zio/process/ProcessStream.scala b/src/main/scala/zio/process/ProcessStream.scala index 7b02954c..6313ff02 100644 --- a/src/main/scala/zio/process/ProcessStream.scala +++ b/src/main/scala/zio/process/ProcessStream.scala @@ -18,9 +18,9 @@ package zio.process import java.io.{ BufferedReader, ByteArrayOutputStream, InputStream, InputStreamReader } import java.nio.charset.{ Charset, StandardCharsets } -import zio.{ RIO, UIO, ZManaged } import zio.blocking.{ effectBlockingCancelable, Blocking } -import zio.stream.{ Stream, StreamChunk, ZSink, ZStream } +import zio.stream.{ ZStream, ZTransducer } +import zio.{ RIO, UIO, ZManaged } import scala.collection.mutable.ArrayBuffer @@ -52,16 +52,15 @@ final case class ProcessStream(private val inputStream: InputStream) { * Return the output of this process as a stream of lines (default encoding of UTF-8). */ def linesStream: ZStream[Blocking, Throwable, String] = - stream.chunks - .aggregate(ZSink.utf8DecodeChunk) - .aggregate(ZSink.splitLines) - .mapConcatChunk(identity) + stream + .aggregate(ZTransducer.utf8Decode) + .aggregate(ZTransducer.splitLines) /** * Return the output of this process as a chunked stream of bytes. */ - def stream: StreamChunk[Throwable, Byte] = - Stream.fromInputStream(inputStream) + def stream: ZStream[Blocking, Throwable, Byte] = + ZStream.fromInputStream(inputStream) /** * Return the entire output of this process as a string (default encoding of UTF-8). diff --git a/src/test/scala/zio/process/CommandSpec.scala b/src/test/scala/zio/process/CommandSpec.scala index 3ab10e2b..49d92e0a 100644 --- a/src/test/scala/zio/process/CommandSpec.scala +++ b/src/test/scala/zio/process/CommandSpec.scala @@ -3,10 +3,11 @@ package zio.process import java.io.{ File, IOException } import java.nio.charset.StandardCharsets -import zio.stream.ZSink +import zio.ZIO +import zio.duration._ +import zio.stream.ZTransducer import zio.test.Assertion._ import zio.test._ -import zio.duration._ import zio.test.environment.TestClock // TODO: Add aspects for different OSes? scala.util.Properties.isWin, etc. Also try to make this as OS agnostic as possible in the first place @@ -29,10 +30,9 @@ object CommandSpec extends ZIOProcessBaseSpec { testM("work with stream directly") { val zio = for { stream <- Command("echo", "-n", "1\n2\n3").stream - lines <- stream.chunks - .aggregate(ZSink.utf8DecodeChunk) - .aggregate(ZSink.splitLines) - .mapConcatChunk(identity) + lines <- stream + .aggregate(ZTransducer.utf8Decode) + .aggregate(ZTransducer.splitLines) .runCollect } yield lines @@ -51,7 +51,7 @@ object CommandSpec extends ZIOProcessBaseSpec { testM("accept streaming stdin") { val zio = for { stream <- Command("echo", "-n", "a", "b", "c").stream - result <- Command("cat").stdin(ProcessInput.fromStreamChunk(stream)).string + result <- Command("cat").stdin(ProcessInput.fromStream(stream)).string } yield result assertM(zio)(equalTo("a b c")) @@ -85,9 +85,11 @@ object CommandSpec extends ZIOProcessBaseSpec { }, testM("interrupt a process due to timeout") { val zio = for { - fiber <- Command("sleep", "20").exitCode.timeout(5.seconds).fork - _ <- TestClock.adjust(5.seconds) - result <- fiber.join + fiber <- Command("sleep", "20").exitCode.timeout(5.seconds).fork + adjustFiber <- TestClock.adjust(5.seconds).fork + _ <- ZIO.sleep(5.seconds) + _ <- adjustFiber.join + result <- fiber.join } yield result assertM(zio)(isNone)