diff --git a/DESCRIPTION b/DESCRIPTION index b8e3c64c3..ca2446394 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: mirai Type: Package Title: Minimalist Async Evaluation Framework for R -Version: 1.2.0.9017 +Version: 1.2.0.9018 Description: Designed for simplicity, a 'mirai' evaluates an R expression asynchronously in a parallel process, locally or distributed over the network, with the result automatically available upon completion. Modern diff --git a/NEWS.md b/NEWS.md index 29f7924a8..618ec1da1 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,4 +1,4 @@ -# mirai 1.2.0.9017 (development) +# mirai 1.2.0.9018 (development) * `daemons(dispatcher = NA)` now provides access to threaded dispatcher (experimental). This implements dispatcher using a thread rather than an external process and is faster and more efficient. * `daemons()` behavioural changes: diff --git a/R/daemons.R b/R/daemons.R index 6d79cf24b..281904fb4 100644 --- a/R/daemons.R +++ b/R/daemons.R @@ -313,7 +313,7 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ..., force sock <- req_socket(urld) sockc <- req_socket(urlc) res <- launch_sync_dispatcher(sock, sockc, wa5(urld, dots, n, urlc, url), output, tls, pass) - is.object(res) && stop(._[["sync_timeout"]]) + is.object(res) && stop(._[["sync_dispatcher"]]) store_dispatcher(sockc = sockc, res = res, cv = cv, envir = envir) } else { sock <- req_socket(url, tls = if (length(tls)) tls_config(server = tls, pass = pass)) @@ -364,12 +364,12 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ..., force urlc <- sprintf("%s%s", urld, "c") sockc <- req_socket(urlc) res <- launch_sync_dispatcher(sock, sockc, wa4(urld, dots, envir[["stream"]], n, urlc), output) - is.object(res) && stop(._[["sync_timeout"]]) + is.object(res) && stop(._[["sync_dispatcher"]]) store_dispatcher(sockc = sockc, res = res, cv = cv, envir = envir) for (i in seq_len(n)) next_stream(envir) } else { sock <- req_socket(urld) - launch_sync_daemons(seq_len(n), sock, urld, dots, envir, output) || stop(._[["sync_timeout"]]) + launch_sync_daemons(seq_len(n), sock, urld, dots, envir, output) || stop(._[["sync_daemons"]]) `[[<-`(envir, "urls", urld) } `[[<-`(.., .compute, `[[<-`(`[[<-`(envir, "sock", sock), "n", n)) @@ -629,7 +629,7 @@ req_socket <- function(url, tls = NULL, resend = 0L) parse_dots <- function(...) { ...length() || return("") dots <- list(...) - dots <- dots[as.logical(lapply(dots, function(x) is.numeric(x) | is.logical(x)))] + dots <- dots[as.logical(lapply(dots, function(x) is.logical(x) || is.numeric(x)))] dnames <- names(dots) out <- sprintf(",%s", paste(dnames, dots, sep = "=", collapse = ",")) pos <- dnames == "output" diff --git a/R/dispatcher.R b/R/dispatcher.R index 36b6211d5..1aa6390cf 100644 --- a/R/dispatcher.R +++ b/R/dispatcher.R @@ -93,7 +93,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., retry = FALSE, token = F pipe_notify(sockc, cv = cv, remove = TRUE, flag = TRUE) dial_and_sync_socket(sock = sockc, url = monitor) cmessage <- recv(sockc, mode = 2L, block = .limit_long) - is.object(cmessage) && stop(._[["sync_timeout"]]) + is.object(cmessage) && stop(._[["sync_dispatcher"]]) if (nzchar(cmessage[2L])) Sys.setenv(R_DEFAULT_PACKAGES = cmessage[2L]) else Sys.unsetenv("R_DEFAULT_PACKAGES") @@ -159,7 +159,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., retry = FALSE, token = F if (auto) for (i in seq_n) - until(cv, .limit_long) || stop(._[["sync_timeout"]]) + until(cv, .limit_long) || stop(._[["sync_daemons"]]) if (ctrchannel) { send(sockc, c(Sys.getpid(), servernames), mode = 2L) diff --git a/R/launchers.R b/R/launchers.R index 228d174e2..6b422c7a2 100644 --- a/R/launchers.R +++ b/R/launchers.R @@ -123,7 +123,7 @@ launch_remote <- function(url, remote = remote_config(), ..., tls = NULL, .compu envir <- ..[[.compute]] dots <- parse_dots(...) if (is.null(tls)) tls <- envir[["tls"]] - url <- process_url(url, .compute = .compute) + url <- process_url(url, .compute = .compute, local = FALSE) ulen <- length(url) command <- remote[["command"]] @@ -433,11 +433,13 @@ find_dot <- function(args) { sel } -process_url <- function(url, .compute) { +process_url <- function(url, .compute, local = TRUE) { if (is.numeric(url)) { vec <- ..[[.compute]][["urls"]] - is.null(vec) && stop(._[["daemons_unset"]], call. = FALSE) - all(url >= 1L, url <= length(vec)) || stop(._[["url_spec"]], call. = FALSE) + is.null(vec) && + stop(sprintf(._[["daemons_unset"]], if (local) "local" else "remote"), call. = FALSE) + all(url >= 1L, url <= length(vec)) || + stop(sprintf(._[["url_spec"]], if (local) "local" else "remote"), call. = FALSE) url <- vec[url] } else { lapply(url, parse_url) diff --git a/R/mirai-package.R b/R/mirai-package.R index 062b47bf2..27080dec6 100644 --- a/R/mirai-package.R +++ b/R/mirai-package.R @@ -88,7 +88,7 @@ arglen = "'args' and/or 'url' must be of length 1 or the same length", cluster_inactive = "cluster is no longer active", correct_context = "'host' must be specified if not using directly in a function argument", - daemons_unset = "launch_*(): a numeric value for 'url' requires daemons to be set", + daemons_unset = "launch_%s(): a numeric value for 'url' requires daemons to be set", dot_required = "remote_config(): '.' must be an element of the character vector(s) supplied to 'args'", missing_expression = "missing expression, perhaps wrap in {}?", missing_url = "at least one URL must be supplied for 'url' or 'n' must be at least 1", @@ -102,8 +102,9 @@ requires_daemons = "mirai_map(): launching one local daemon as none previously set", requires_local = "ssh_config(): SSH tunnelling requires 'url' hostname to be '127.0.0.1' or 'localhost'", single_url = "only one 'url' should be specified", - sync_timeout = "initial sync with dispatcher/daemon timed out after 10s", - url_spec = "launch_*(): numeric value for 'url' is out of bounds" + sync_daemons = "initial sync with daemon(s) timed out after 10s", + sync_dispatcher = "initial sync with dispatcher timed out after 10s", + url_spec = "launch_%s(): numeric value for 'url' is out of bounds" ), hash = TRUE ) diff --git a/tests/tests.R b/tests/tests.R index cff28857a..432871633 100644 --- a/tests/tests.R +++ b/tests/tests.R @@ -213,7 +213,7 @@ connection && .Platform[["OS.type"]] != "windows" && Sys.getenv("NOT_CRAN") == " nanotestz(daemons(NULL)) nanotesto(daemons(url = "ws://:0", correctype = 0L, token = TRUE)) nanotestz(daemons(0L)) - nanotestz(with(daemons(url = "tcp://:0", correcttype = 1, token = TRUE), {8L - 9L + 1L})) + nanotestz(with(daemons(url = "tcp://:0", correcttype = c(1, 0), token = TRUE), {8L - 9L + 1L})) nanotest(daemons(n = 2, "ws://:0") == 2L) nanotest(is.integer(nextget("pid"))) nanotest(length(nextget("urls")) == 2L)