Skip to content

Commit

Permalink
reverts exitlinger for ephemeral daemons for robustness
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Oct 2, 2023
1 parent 18aeeab commit 1f8b3e5
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 15 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 0.10.0.9019
Version: 0.10.0.9020
Description: Lightweight parallel code execution and distributed computing.
Designed for simplicity, a 'mirai' evaluates an R expression asynchronously,
on local or network resources, resolving automatically upon completion.
Expand Down
4 changes: 2 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# mirai 0.10.0.9019 (development)
# mirai 0.10.0.9020 (development)

* Implements an alternative communications backend for R, adding methods for the 'parallel' base package.
+ Fulfils a request by R Core at R Project Sprint 2023, and requires R >= 4.4 (currently R-devel).
Expand All @@ -8,7 +8,7 @@
+ Argument signature of 'args' for `launch_remote()` changed to accommodate.
* `daemons()` adds argument 'resilience' to control the behaviour, when not using dispatcher, of whether to retry failed tasks on other daemons.
* `mirai()` adds logical argument '.signal' for whether to signal the condition variable within the compute profile upon resolution of the 'mirai'.
* `daemon()` argument 'exitlinger' retired as daemons now synchronise with the host/dispatcher and exit as soon as possible.
* `daemon()` argument 'exitlinger' retired as daemons now synchronise with the host/dispatcher and exit as soon as possible. Note: a default 'exitlinger' period still applies for ephemeral daemons.

This comment has been minimized.

Copy link
@wlandau

wlandau Oct 2, 2023

Do you know if this will affect crew? Not sure whether crew's daemons would qualify as persistent or ephemeral.

This comment has been minimized.

Copy link
@shikokuchuo

shikokuchuo Oct 2, 2023

Author Owner

No it does not. Turns out it is not so trivial to solve universally...

This comment has been minimized.

Copy link
@shikokuchuo

shikokuchuo Oct 2, 2023

Author Owner

I've been doing additional robustness testing in prep for a CRAN release, Lucky to have caught this.

* Optimises scheduling at dispatcher: tasks are no longer assigned to a daemon if it is exiting due to specified time/task-outs.
* An 'errorValue' 19 'Connection reset' is now returned for a 'mirai' if the connection to either dispatcher or an ephemeral daemon drops, for example if they have crashed, rather than remaining unresolved.
* Invalid type of '...' arguments specified to `daemons()` or `dispatcher()` now raise an error early rather than attempting to launch daemons that fail.
Expand Down
11 changes: 5 additions & 6 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -143,24 +143,23 @@ daemon <- function(url, asyncdial = FALSE, maxtasks = Inf, idletime = Inf,
#' Implements an ephemeral executor for the remote process.
#'
#' @inheritParams daemon
#' @param exitlinger [default 2000L] time in milliseconds to linger before
#' exiting to allow the socket to complete sends currently in progress.
#'
#' @return Logical TRUE, invisibly.
#' @return Invisible NULL.
#'
#' @keywords internal
#' @export
#'
.daemon <- function(url) {
.daemon <- function(url, exitlinger) {

sock <- socket(protocol = "rep", dial = url, autostart = NA)
on.exit(reap(sock))
cv <- cv()
pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = FALSE)
._mirai_. <- recv(sock, mode = 1L, block = TRUE)
data <- tryCatch(eval(expr = ._mirai_.[[".expr"]], envir = ._mirai_., enclos = NULL),
error = mk_mirai_error, interrupt = mk_interrupt_error)
send(sock, data = data, mode = 1L, block = TRUE)
data <- recv_aio_signal(sock, cv = cv, mode = 8L)
wait(cv)
msleep(exitlinger)

}

Expand Down
2 changes: 1 addition & 1 deletion R/mirai.R
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ mirai <- function(.expr, ..., .args = list(), .timeout = NULL, .signal = FALSE,
url <- auto_tokenized_url()
sock <- req_socket(url, resend = 0L)
if (length(.timeout)) launch_and_sync_daemon(sock = sock, url) else launch_daemon(url)
aio <- request(.context(sock), data = envir, send_mode = 1L, recv_mode = 1L, timeout = .timeout, ack = TRUE)
aio <- request(.context(sock), data = envir, send_mode = 1L, recv_mode = 1L, timeout = .timeout)
`attr<-`(.subset2(aio, "aio"), "sock", sock)

}
Expand Down
7 changes: 5 additions & 2 deletions man/dot-daemon.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ nanotest(!unresolved(b))
nanotest(!unresolved(b2$data))
nanotest(is.environment(b) && is.character(b$data))
nanotest(is.environment(b2) && is.character(b2$data))
Sys.sleep(1L)
Sys.sleep(2.5)
nanotestp(m)
nanotestp(b)
`lang obj` <- quote(m + n + 2L)
args <- list(m = 2L, n = 4L)
m <- mirai(.expr = `lang obj`, .args = args, .timeout = 2000L)
nanotest(call_mirai(m)$data == 8L || is_error_value(m$data))
Sys.sleep(1L)
Sys.sleep(2.5)
nanotest(daemons(url = value <- mirai:::auto_tokenized_url(), dispatcher = FALSE) == value)
nanotest(grepl("://", launch_remote(status()$daemons), fixed = TRUE))
nanotestz(daemons(0L))
Expand All @@ -92,7 +92,7 @@ if (.Platform[["OS.type"]] != "windows") {
nanotest(is_mirai(m <- mirai(TRUE)))
nanotest(is.character(launch_remote("ws://[::1]:5555", command = "echo", args = list(c("Test out:", ".", ">/dev/null")), rscript = "/usr/lib/R/bin/Rscript")))
nanotest(is.character(launch_remote("tcp://localhost:5555", command = "echo", args = ssh_args(names = c("remotehost", "remotenode"), tunnel = TRUE))))
Sys.sleep(1L)
Sys.sleep(2.5)
nanotestn(launch_local(mirai:::auto_tokenized_url(), .compute = "test"))
Sys.sleep(1L)
nanotest(daemons(n = 2L, url = value <- "ws://:0", dispatcher = FALSE) != value)
Expand Down

0 comments on commit 1f8b3e5

Please sign in to comment.