Skip to content

Commit

Permalink
no longer relies on reap()
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Nov 30, 2023
1 parent a258c2f commit 43b1de2
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 18 deletions.
4 changes: 2 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 0.11.2.9015
Version: 0.11.2.9016
Description: Lightweight parallel code execution and distributed computing.
Designed for simplicity, a 'mirai' evaluates an R expression asynchronously,
on local or network resources, resolving automatically upon completion.
Expand All @@ -22,7 +22,7 @@ Encoding: UTF-8
Depends:
R (>= 3.5)
Imports:
nanonext (>= 0.10.4.9021)
nanonext (>= 0.10.4.9023)
Enhances:
parallel,
promises
Expand Down
1 change: 0 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ importFrom(nanonext,opt)
importFrom(nanonext,parse_url)
importFrom(nanonext,pipe_notify)
importFrom(nanonext,random)
importFrom(nanonext,reap)
importFrom(nanonext,recv)
importFrom(nanonext,recv_aio_signal)
importFrom(nanonext,request)
Expand Down
4 changes: 2 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# mirai 0.11.2.9015 (development)
# mirai 0.11.2.9016 (development)

* Implements `register()` for registering custom serialization and unserialization functions when using daemons.
* Introduces `call_mirai_()`, a user-interruptible version of `call_mirai()` suitable for interactive use.
Expand All @@ -7,7 +7,7 @@
+ '.signal' argument removed - now all 'mirai' signal if daemons are set up.
* `everywhere()` now returns invisible NULL in the case the specified compute profile is not set up, rather than error.
* Improved error messages and handling for daemon/dispatcher connection errors.
* Requires nanonext >= [0.10.4.9021].
* Requires nanonext >= [0.10.4.9023].

# mirai 0.11.2

Expand Down
4 changes: 2 additions & 2 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE,

cv <- cv()
sock <- socket(protocol = "rep")
on.exit(reap(sock))
on.exit(close(sock))
autoexit && pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = TRUE)
if (length(tls)) tls <- tls_config(client = tls)
dial_and_sync_socket(sock = sock, url = url, asyncdial = !autoexit, tls = tls)
Expand Down Expand Up @@ -174,7 +174,7 @@ daemon <- function(url, autoexit = TRUE, cleanup = TRUE, output = FALSE,
.daemon <- function(url) {

sock <- socket(protocol = "rep", dial = url, autostart = NA)
on.exit(reap(sock))
on.exit(close(sock))
._mirai_. <- recv(sock, mode = 1L, block = TRUE)
data <- tryCatch(eval(expr = ._mirai_.[[".expr"]], envir = ._mirai_., enclos = NULL),
error = mk_mirai_error, interrupt = mk_interrupt_error)
Expand Down
4 changes: 2 additions & 2 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = TRUE, ...,
length(envir) || return(0L)

if (signal) send_signal(envir = envir)
reap(envir[["sock"]])
length(envir[["sockc"]]) && reap(envir[["sockc"]])
close(envir[["sock"]])
length(envir[["sockc"]]) && close(envir[["sockc"]])
..[[.compute]] <- NULL -> envir

} else if (is.null(envir)) {
Expand Down
16 changes: 8 additions & 8 deletions R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,

cv <- cv()
sock <- socket(protocol = "rep")
on.exit(reap(sock))
on.exit(close(sock))
pipe_notify(sock, cv = cv, add = FALSE, remove = TRUE, flag = TRUE)
dial_and_sync_socket(sock = sock, url = host, asyncdial = asyncdial)

Expand Down Expand Up @@ -135,12 +135,12 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
queue[[i]] <- create_req(ctx = .context(sock), cv = cv)
}

on.exit(lapply(servers, reap), add = TRUE, after = TRUE)
on.exit(lapply(servers, close), add = TRUE, after = TRUE)

ctrchannel <- is.character(monitor)
if (ctrchannel) {
sockc <- socket(protocol = "rep")
on.exit(reap(sockc), add = TRUE, after = FALSE)
on.exit(close(sockc), add = TRUE, after = FALSE)
dial_and_sync_socket(sock = sockc, url = monitor, asyncdial = asyncdial)
recv(sockc, mode = 6L, block = .timelimit) && stop(.messages[["sync_timeout"]])
send_aio(sockc, c(Sys.getpid(), servernames), mode = 2L)
Expand All @@ -165,15 +165,15 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
i <- .subset2(cmessage, "value")
if (i) {
if (i > 0L && !activevec[[i]]) {
reap(attr(servers[[i]], "listener")[[1L]])
close(attr(servers[[i]], "listener")[[1L]])
attr(servers[[i]], "listener") <- NULL
data <- servernames[i] <- if (auto) auto_tokenized_url() else new_tokenized_url(basenames[i])
instance[i] <- -abs(instance[i])
listen(servers[[i]], url = data, tls = tls, error = TRUE)

} else if (i < 0L) {
i <- -i
reap(servers[[i]])
close(servers[[i]])
servers[[i]] <- nsock <- req_socket(NULL)
pipe_notify(nsock, cv = active[[i]], cv2 = cv, add = TRUE, remove = TRUE, flag = FALSE)
lock(nsock, cv = active[[i]])
Expand All @@ -200,15 +200,15 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., asyncdial = FALSE,
send_aio(queue[[i]][["ctx"]], data = req, mode = 2L)
q <- queue[[i]][["daemon"]]
if (req[3L]) {
reap(attr(servers[[q]], "listener")[[1L]])
close(attr(servers[[q]], "listener")[[1L]])
attr(servers[[q]], "listener") <- NULL
gc(verbose = FALSE)
listen(servers[[q]], url = servernames[q], tls = tls, error = FALSE)
listen(servers[[q]], url = servernames[q], tls = tls, error = TRUE)
} else {
serverfree[q] <- TRUE
}
complete[q] <- complete[q] + 1L
queue[[i]] <- create_req(ctx = .context(sock), cv = cv) -> req
queue[[i]] <- create_req(ctx = .context(sock), cv = cv)
}

free <- which(serverfree & activevec)
Expand Down
2 changes: 1 addition & 1 deletion R/mirai-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
#'
#' @importFrom nanonext base64dec call_aio call_aio_ .context cv cv_value dial
#' is_error_value listen lock mclock msleep nextmode opt opt<- parse_url
#' pipe_notify random reap recv recv_aio_signal request request_signal send
#' pipe_notify random recv recv_aio_signal request request_signal send
#' send_aio socket stat stop_aio strcat tls_config unresolved until wait
#' write_cert
#' @importFrom parallel nextRNGStream stopCluster
Expand Down

0 comments on commit 43b1de2

Please sign in to comment.