Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make all process operators interruptible #430

Open
wants to merge 9 commits into
base: series/2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .sbtopts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-J-Xmx4G
-J-XX:MaxMetaspaceSize=1G
-J-Dfile.encoding=UTF-8
-J-Dsbt.io.implicit.relative.glob.conversion=allow
2 changes: 2 additions & 0 deletions project/BuildHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ object BuildHelper {
def stdSettings(prjName: String) = Seq(
name := s"$prjName",
fork := true,
run / baseDirectory := file("."),
Test / baseDirectory := file("."),
crossScalaVersions := Seq(Scala212, Scala213),
ThisBuild / scalaVersion := Scala213,
scalacOptions := stdOptions ++ extraOptions(scalaVersion.value),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ package zio.process

import FilePlatformSpecific._
import ProcessPlatformSpecific._
import zio.{ NonEmptyChunk, ZIO }
import zio._
import scala.annotation.nowarn
import scala.scalajs.js
import js.JSConverters._
Expand All @@ -38,7 +38,7 @@ private[process] trait CommandPlatformSpecific {
stdinStream <- c.stdin match {
case FromStream(stream, _) =>
stream.toInputStream.map(Some(_))
case JavaStream(in, _) => ZIO.succeed(Some(in))
case JavaStream(in, _) => ZIO.some(in)
case _ => ZIO.none
}
promise <- ZIO.attempt {
Expand Down Expand Up @@ -97,7 +97,11 @@ private[process] trait CommandPlatformSpecific {
process.stdin.write(new js.typedarray.Uint8Array(arr.take(read).map(_.toShort).toJSArray))
)
else if (read == -1)
if (process.stdin != null) if (!process.stdin.writableEnded) { process.stdin.end(); () }
if (process.stdin != null && !process.stdin.writableEnded) {
process.stdin.end()
()
}

}

write(true)
Expand All @@ -113,7 +117,10 @@ private[process] trait CommandPlatformSpecific {
"end",
{ () =>
connectStdin(s)
if (process.stdin != null) if (!process.stdin.writableEnded) process.stdin.end()

if (process.stdin != null && !process.stdin.writableEnded)
process.stdin.end()

resolve(zioProcess)
}
)
Expand All @@ -122,7 +129,9 @@ private[process] trait CommandPlatformSpecific {
process.on("spawn", () => connectStdin(s))
in.on(
"end",
() => if (process.stdin != null) if (!process.stdin.writableEnded) process.stdin.end()
() =>
if (process.stdin != null && !process.stdin.writableEnded)
process.stdin.end()
)
val node = js.Dynamic.global.require("process")
process.on("spawn", () => node.nextTick(() => resolve(zioProcess)))
Expand Down
18 changes: 9 additions & 9 deletions zio-process/js/src/main/scala/zio/process/Constructors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import zio.stream.ZStream

import java.io.OutputStream
import zio.Trace

import java.io.IOException
import zio.ZIO
import zio.Chunk
import zio.Scope
import zio.process.ProcessPlatformSpecific.JProcess

import java.io.InputStream

private[process] object Constructors {
Expand Down Expand Up @@ -52,19 +55,14 @@ private[process] object Constructors {
val bytes = byteChunk.toArray
out.write(bytes)
bytesWritten + bytes.length
}.refineOrDie { case e: IOException =>
e
}
}.refineToOrDie[IOException]
}
}
}

def zsink(outputStream: OutputStream): ZSink[Any, IOException, Byte, Byte, Long] = fromOutputStream(outputStream)

/**
* Creates a stream from a `java.io.InputStream`
*/
def fromInputStream(
def fromProcessInputStream(process: JProcess)(
is: => InputStream,
chunkSize: => Int = ZStream.DefaultChunkSize
)(implicit trace: Trace): ZStream[Any, IOException, Byte] =
Expand All @@ -73,8 +71,10 @@ private[process] object Constructors {
for {
bufArray <- ZIO.succeed(Array.ofDim[Byte](chunkSize))
bytesRead <- ZIO
.attemptBlockingCancelable(is.read(bufArray))(ZIO.succeed(is.close()))
.refineToOrDie[java.io.IOException]
.attemptBlockingCancelable(is.read(bufArray))(
ZIO.succeed(is.close()).ensuring(ZIO.attemptBlocking(process.kill()).ignore)
)
.refineToOrDie[IOException]
.asSomeError
bytes <- if (bytesRead < 0)
ZIO.fail(None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ private[process] object FilePlatformSpecific {
type File = String
type Path = String

def fileOf(file: String): File = file

def getAbsolute(file: File): js.Any = {
val path = js.Dynamic.global.require("path")
val nodejs = js.Dynamic.global.require("process")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package zio.process

import zio.stream.ZStream
import zio.{ Chunk, Queue }
import zio._
import FilePlatformSpecific._
import java.io.ByteArrayInputStream
import java.nio.charset.{ Charset, StandardCharsets }
Expand All @@ -41,7 +41,7 @@ object ProcessInput {
ProcessInput.JavaStream(
ProcessPlatformSpecific.JSInputStream(
fs.createReadStream(FilePlatformSpecific.getAbsolute(file)).asInstanceOf[JS.Readable],
true
pause = true
),
flushChunksEagerly = false
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
*/
package zio.process

import zio.ZIO
import zio._

import java.io.{ InputStream, OutputStream }
import scala.scalajs.js.typedarray.Uint8Array
import scala.scalajs.js
import zio.Chunk
import js.JSConverters._

private[process] trait ProcessPlatformSpecific extends ProcessInterface { self: Process =>
Expand All @@ -29,7 +28,13 @@ private[process] trait ProcessPlatformSpecific extends ProcessInterface { self:

private var killed = false

protected def waitForUnsafe: Int = self.process.exitCode
def waitFor: IO[CommandError, ExitCode] =
ProcessPlatformSpecific.wait(stdoutInternal).map(_ => ExitCode(self.process.exitCode)).refineOrDie {
case CommandThrowable.IOError(e) => e
}

protected def waitForUnsafe: Int =
self.process.exitCode

protected def isAliveUnsafe: Boolean = !killed
protected def destroyUnsafe(): Unit = {
Expand Down Expand Up @@ -139,6 +144,7 @@ private[process] object ProcessPlatformSpecific {
}

}

override def readAllBytes(): Array[Byte] = {
val arr = buffer.toArray
buffer = Chunk.empty
Expand Down
9 changes: 0 additions & 9 deletions zio-process/js/src/test/bash/both-streams-test.sh

This file was deleted.

3 changes: 0 additions & 3 deletions zio-process/js/src/test/bash/echo-repeat.sh

This file was deleted.

5 changes: 0 additions & 5 deletions zio-process/js/src/test/bash/kill-test/sample-child.sh

This file was deleted.

7 changes: 0 additions & 7 deletions zio-process/js/src/test/bash/kill-test/sample-parent.sh

This file was deleted.

3 changes: 0 additions & 3 deletions zio-process/js/src/test/bash/no-permissions.sh

This file was deleted.

6 changes: 0 additions & 6 deletions zio-process/js/src/test/bash/stdin-echo.sh

This file was deleted.

This file was deleted.

30 changes: 24 additions & 6 deletions zio-process/jvm/src/main/scala/zio/process/Constructors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,42 @@
*/
package zio.process

import zio.process.ProcessPlatformSpecific.JProcess
import zio.stream.ZSink
import zio.stream.ZStream

import java.io.OutputStream
import java.io.InputStream
import zio.Trace
import zio._

import java.io.IOException

private[process] object Constructors {

def zsink(outputStream: OutputStream) = ZSink.fromOutputStream(outputStream)

/**
* Creates a stream from a `java.io.InputStream`
*/
def fromInputStream(
def fromProcessInputStream(process: JProcess)(
is: => InputStream,
chunkSize: => Int = ZStream.DefaultChunkSize
)(implicit trace: Trace): ZStream[Any, IOException, Byte] = ZStream.fromInputStream(is, chunkSize)
)(implicit trace: Trace): ZStream[Any, IOException, Byte] =
ZStream.succeed((is, chunkSize)).flatMap { case (is, chunkSize) =>
ZStream.repeatZIOChunkOption {
for {
bufArray <- ZIO.succeed(Array.ofDim[Byte](chunkSize))
bytesRead <- ZIO
.attemptBlockingCancelable(is.read(bufArray))(ZIO.attemptBlocking(process.destroy()).ignore)
.refineToOrDie[IOException]
.asSomeError
bytes <- if (bytesRead < 0)
ZIO.fail(None)
else if (bytesRead == 0)
ZIO.succeed(Chunk.empty)
else if (bytesRead < chunkSize)
ZIO.succeed(Chunk.fromArray(bufArray).take(bytesRead))
else
ZIO.succeed(Chunk.fromArray(bufArray))
} yield bytes
}
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package zio.process

object FilePlatformSpecific {
private[process] object FilePlatformSpecific {
type File = java.io.File

def exists(file: File) = file.exists()
def fileOf(file: String): File = new File(file)

def exists(file: File): Boolean = file.exists()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package zio.process

import zio.stream.ZStream
import zio.{ Chunk, Queue }
import zio._

import java.io.ByteArrayInputStream
import java.nio.charset.{ Charset, StandardCharsets }
Expand Down
Loading
Loading