Skip to content

Commit

Permalink
Upgrade to ZIO 1.0.0-RC19 (#13)
Browse files Browse the repository at this point in the history
* Upgrade to ZIO 1.0.0-RC19

* Fix test

* Fix docs
  • Loading branch information
reibitto authored May 16, 2020
1 parent bcdda77 commit 9f0b40d
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 35 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ ThisBuild / publishTo := sonatypePublishToBundle.value
addCommandAlias("fmt", "all scalafmtSbt scalafmt test:scalafmt")
addCommandAlias("check", "all scalafmtSbtCheck scalafmtCheck test:scalafmtCheck")

val zioVersion = "1.0.0-RC18-2"
val zioVersion = "1.0.0-RC19"
libraryDependencies ++= Seq(
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-streams" % zioVersion,
Expand Down
4 changes: 1 addition & 3 deletions docs/overview/basics.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,14 @@ When you don't care about the output (or there is no output), you can return jus
command.exitCode
```

### Stream of bytes (chunked)
### Stream of bytes

If you need lower-level access to the output's stream of bytes, you can access them directly like so:

```scala mdoc:silent
command.stream
```

The bytes are chunked for performance in the form of `StreamChunk[Throwable, Byte]`

### Access stdout and stderr separately

There are times where you need to process the output of stderr as well.
Expand Down
4 changes: 2 additions & 2 deletions docs/overview/piping.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import zio.process._
```scala mdoc:silent
for {
processes <- Command("ps", "-ef").stream
javaProcesses <- Command("grep", "java").stdin(ProcessInput.fromStreamChunk(processes)).stream
processIds <- Command("awk", "{print $2}").stdin(ProcessInput.fromStreamChunk(javaProcesses)).lines
javaProcesses <- Command("grep", "java").stdin(ProcessInput.fromStream(processes)).stream
processIds <- Command("awk", "{print $2}").stdin(ProcessInput.fromStream(javaProcesses)).lines
} yield processIds
```

Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/zio/process/Command.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.lang.ProcessBuilder.Redirect
import java.nio.charset.Charset

import zio.blocking.Blocking
import zio.stream.{ StreamChunk, ZSink, ZStream }
import zio.stream.{ ZSink, ZStream }
import zio.{ IO, RIO, Task, UIO, ZIO }

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -137,7 +137,7 @@ sealed trait Command {
case ProcessInput(Some(input)) =>
for {
outputStream <- process.execute(_.getOutputStream)
_ <- input.chunks
_ <- input
.run(ZSink.fromOutputStream(outputStream))
.ensuring(UIO(outputStream.close()))
.forkDaemon
Expand All @@ -155,9 +155,9 @@ sealed trait Command {
for {
stream <- tail.init.foldLeft(head.stream) {
case (s, command) =>
s.flatMap(input => command.stdin(ProcessInput.fromStreamChunk(input)).stream)
s.flatMap(input => command.stdin(ProcessInput.fromStream(input)).stream)
}
result <- tail.last.stdin(ProcessInput.fromStreamChunk(stream)).run
result <- tail.last.stdin(ProcessInput.fromStream(stream)).run
} yield result
}
}
Expand Down Expand Up @@ -203,7 +203,7 @@ sealed trait Command {
/**
* Runs the command returning the output as a chunked stream of bytes.
*/
def stream: RIO[Blocking, StreamChunk[Throwable, Byte]] =
def stream: RIO[Blocking, ZStream[Blocking, Throwable, Byte]] =
run.map(_.stdout.stream)

/**
Expand Down
13 changes: 7 additions & 6 deletions src/main/scala/zio/process/ProcessInput.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import java.io.ByteArrayInputStream
import java.nio.charset.{ Charset, StandardCharsets }

import zio.Chunk
import zio.stream.{ Stream, StreamChunk }
import zio.blocking.Blocking
import zio.stream.{ Stream, ZStream }

final case class ProcessInput(source: Option[StreamChunk[Throwable, Byte]])
final case class ProcessInput(source: Option[ZStream[Blocking, Throwable, Byte]])

object ProcessInput {
val inherit: ProcessInput = ProcessInput(None)
Expand All @@ -33,20 +34,20 @@ object ProcessInput {
ProcessInput(Some(Stream.fromInputStream(new ByteArrayInputStream(bytes))))

/**
* Returns a ProcessInput from a stream of chunked bytes.
* Returns a ProcessInput from a stream of bytes.
*/
def fromStreamChunk(stream: StreamChunk[Throwable, Byte]): ProcessInput =
def fromStream(stream: ZStream[Blocking, Throwable, Byte]): ProcessInput =
ProcessInput(Some(stream))

/**
* Returns a ProcessInput from a String with the given charset.
*/
def fromString(text: String, charset: Charset): ProcessInput =
ProcessInput(Some(StreamChunk.fromChunks(Chunk.fromArray(text.getBytes(charset)))))
ProcessInput(Some(ZStream.fromChunks(Chunk.fromArray(text.getBytes(charset)))))

/**
* Returns a ProcessInput from a UTF-8 String.
*/
def fromUTF8String(text: String): ProcessInput =
ProcessInput(Some(StreamChunk.fromChunks(Chunk.fromArray(text.getBytes(StandardCharsets.UTF_8)))))
ProcessInput(Some(ZStream.fromChunks(Chunk.fromArray(text.getBytes(StandardCharsets.UTF_8)))))
}
15 changes: 7 additions & 8 deletions src/main/scala/zio/process/ProcessStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package zio.process
import java.io.{ BufferedReader, ByteArrayOutputStream, InputStream, InputStreamReader }
import java.nio.charset.{ Charset, StandardCharsets }

import zio.{ RIO, UIO, ZManaged }
import zio.blocking.{ effectBlockingCancelable, Blocking }
import zio.stream.{ Stream, StreamChunk, ZSink, ZStream }
import zio.stream.{ ZStream, ZTransducer }
import zio.{ RIO, UIO, ZManaged }

import scala.collection.mutable.ArrayBuffer

Expand Down Expand Up @@ -52,16 +52,15 @@ final case class ProcessStream(private val inputStream: InputStream) {
* Return the output of this process as a stream of lines (default encoding of UTF-8).
*/
def linesStream: ZStream[Blocking, Throwable, String] =
stream.chunks
.aggregate(ZSink.utf8DecodeChunk)
.aggregate(ZSink.splitLines)
.mapConcatChunk(identity)
stream
.aggregate(ZTransducer.utf8Decode)
.aggregate(ZTransducer.splitLines)

/**
* Return the output of this process as a chunked stream of bytes.
*/
def stream: StreamChunk[Throwable, Byte] =
Stream.fromInputStream(inputStream)
def stream: ZStream[Blocking, Throwable, Byte] =
ZStream.fromInputStream(inputStream)

/**
* Return the entire output of this process as a string (default encoding of UTF-8).
Expand Down
22 changes: 12 additions & 10 deletions src/test/scala/zio/process/CommandSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package zio.process
import java.io.{ File, IOException }
import java.nio.charset.StandardCharsets

import zio.stream.ZSink
import zio.ZIO
import zio.duration._
import zio.stream.ZTransducer
import zio.test.Assertion._
import zio.test._
import zio.duration._
import zio.test.environment.TestClock

// TODO: Add aspects for different OSes? scala.util.Properties.isWin, etc. Also try to make this as OS agnostic as possible in the first place
Expand All @@ -29,10 +30,9 @@ object CommandSpec extends ZIOProcessBaseSpec {
testM("work with stream directly") {
val zio = for {
stream <- Command("echo", "-n", "1\n2\n3").stream
lines <- stream.chunks
.aggregate(ZSink.utf8DecodeChunk)
.aggregate(ZSink.splitLines)
.mapConcatChunk(identity)
lines <- stream
.aggregate(ZTransducer.utf8Decode)
.aggregate(ZTransducer.splitLines)
.runCollect
} yield lines

Expand All @@ -51,7 +51,7 @@ object CommandSpec extends ZIOProcessBaseSpec {
testM("accept streaming stdin") {
val zio = for {
stream <- Command("echo", "-n", "a", "b", "c").stream
result <- Command("cat").stdin(ProcessInput.fromStreamChunk(stream)).string
result <- Command("cat").stdin(ProcessInput.fromStream(stream)).string
} yield result

assertM(zio)(equalTo("a b c"))
Expand Down Expand Up @@ -85,9 +85,11 @@ object CommandSpec extends ZIOProcessBaseSpec {
},
testM("interrupt a process due to timeout") {
val zio = for {
fiber <- Command("sleep", "20").exitCode.timeout(5.seconds).fork
_ <- TestClock.adjust(5.seconds)
result <- fiber.join
fiber <- Command("sleep", "20").exitCode.timeout(5.seconds).fork
adjustFiber <- TestClock.adjust(5.seconds).fork
_ <- ZIO.sleep(5.seconds)
_ <- adjustFiber.join
result <- fiber.join
} yield result

assertM(zio)(isNone)
Expand Down

0 comments on commit 9f0b40d

Please sign in to comment.