Skip to content

Commit

Permalink
vectorize stop_mirai() (as documented)
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Dec 4, 2024
1 parent baccc90 commit 5fe0b60
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 9 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 1.3.1.9018
Version: 1.3.1.9019
Description: Designed for simplicity, a 'mirai' evaluates an R expression
asynchronously in a parallel process, locally or distributed over the
network, with the result automatically available upon completion. Modern
Expand Down
6 changes: 3 additions & 3 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# mirai 1.3.1.9018 (development)
# mirai 1.3.1.9019 (development)

#### Architecture Change
#### New Architecture

* Distributed computing now uses a single URL at which all daemons connect (with or without dispatcher).
- Allows using a more efficient `tcp://` or `tls+tcp://` address instead of websockets.
- Allows a more efficient `tcp://` or `tls+tcp://` connection in most cases instead of websockets.
- Daemons may be added or removed at any time without limit.

#### New Features
Expand Down
3 changes: 2 additions & 1 deletion R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL,

psock <- socket(protocol = "poly")
on.exit(reap(psock), add = TRUE, after = TRUE)
`opt<-`(psock, "send-buffer", 1L)
m <- monitor(psock, cv)
listen(psock, url = url, tls = tls, error = TRUE)

Expand Down Expand Up @@ -155,7 +156,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL,
found <- FALSE
for (i in seq_along(outq))
if (outq[[i]][["msgid"]] == id) {
send(psock, .connectionReset, mode = 1L, pipe = outq[[i]][["pipe"]], block = TRUE)
send(psock, .cancelRequest, mode = 1L, pipe = outq[[i]][["pipe"]], block = TRUE)
outq[[i]][["msgid"]] <- -1L
found <- TRUE
break
Expand Down
9 changes: 7 additions & 2 deletions R/mirai.R
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,9 @@ collect_mirai <- collect_aio
#' completed or previously cancelled). Will always return \code{FALSE} if not
#' using dispatcher.
#'
#' Or a vector of logical values if supplying a list of \sQuote{mirai}, such
#' as those returned by \code{\link{mirai_map}}.
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
Expand All @@ -421,8 +424,9 @@ collect_mirai <- collect_aio
#' @export
#'
stop_mirai <- function(x) {
is.list(x) && return(rev(as.logical(lapply(rev(unclass(x)), stop_mirai))))
.compute <- attr(x, "profile")
envir <- if (!is.null(.compute)) ..[[.compute]]
envir <- if (is.character(.compute)) ..[[.compute]]
stop_aio(x)
invisible(length(envir[["msgid"]]) && query_dispatcher(envir[["sock"]], c(0L, attr(x, "msgid"))))
}
Expand Down Expand Up @@ -628,6 +632,7 @@ mk_mirai_error <- function(e, sc) {

.miraiInterrupt <- `class<-`("", c("miraiInterrupt", "errorValue", "try-error"))
.connectionReset <- `class<-`(19L, c("errorValue", "try-error"))
.register <- expression(mirai:::register(.serial), NULL)
.cancelRequest <- `class<-`(0L, "c")
.register <- expression(mirai:::register(.serial))
.snapshot <- expression(on.exit(mirai:::snapshot(), add = TRUE))
.block <- expression(on.exit(nanonext::msleep(500L), add = TRUE))
3 changes: 3 additions & 0 deletions man/stop_mirai.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ connection && Sys.getenv("NOT_CRAN") == "true" && {
test_equal(m2$data, 20L)
test_equal(m1$data, 20L)
test_class("errorValue", mirai(res)[])
m <- mirai_map(1:10, function(x) { Sys.sleep(2); y <<- TRUE })
s <- stop_mirai(m)
test_equal(sum(unlist(m[])), 200L)
if (.Platform[["OS.type"]] != "windows") test_identical(s, !logical(10L))
if (.Platform[["OS.type"]] != "windows") test_class("errorValue", mirai(y)[])
test_equal(status()$connections, 1L)
test_equal(length(nextget("urls")), 1L)
test_class("miraiLaunchCmd", launch_remote(1))
Expand Down Expand Up @@ -323,10 +328,12 @@ connection && Sys.getenv("NOT_CRAN") == "true" && {
test_equal(daemons(url = host_url(ws = TRUE, tls = TRUE), dispatcher = "thread", output = TRUE), 1L)
test_equal(nextget("n"), 1L)
test_equal(length(nextget("urls")), 1L)
test_null(saisei(i = 0L))
test_null(saisei(i = 10L))
test_class("matrix", status()$daemons)
test_null(saisei(i = 0L))
test_print(saisei(i = 1L))
Sys.sleep(0.1)
test_print(saisei(i = 1L, force = TRUE))
Sys.sleep(0.1)
test_zero(daemons(0))
}
Sys.sleep(1L)

0 comments on commit 5fe0b60

Please sign in to comment.