Skip to content

Commit

Permalink
make new dispatcher the default
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Dec 1, 2024
1 parent 4031b64 commit 6a9435f
Show file tree
Hide file tree
Showing 11 changed files with 356 additions and 345 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 1.3.1.9009
Version: 1.3.1.9010
Description: Designed for simplicity, a 'mirai' evaluates an R expression
asynchronously in a parallel process, locally or distributed over the
network, with the result automatically available upon completion. Modern
Expand Down
2 changes: 1 addition & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export(collect_mirai)
export(daemon)
export(daemons)
export(dispatcher)
export(dispatcher2)
export(dispatcher_v1)
export(everywhere)
export(host_url)
export(is_error_value)
Expand Down
9 changes: 5 additions & 4 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# mirai 1.3.1.9009 (development)
# mirai 1.3.1.9010 (development)

#### New Features

* Introduces `daemons(dispatcher = "next")`, a newer and more efficient dispatcher. Supports mirai cancellation.
* `daemon()` gains the new argument 'dispatcher', which should be set to `TRUE` when using dispatcher and `FALSE` otherwise.
* `daemons(dispatcher = "default")` provides a new and more efficient architecture for dispatcher. Although 'process' is no longer an option, this will still work and retains the previous behaviour of the v1 dispatcher.
* `stop_mirai()` is upgraded to cancel remote mirai tasks when using the new dispatcher, returning a logical value indicating whether cancellation was successful.
* `daemon()` gains the new argument 'dispatcher', which should be set to `TRUE` when connecting to dispatcher and `FALSE` when connecting directly to host.

#### Updates

* Experimental threaded dispatcher `daemons(dispatcher = "thread")` has been retired (as this was based on the old dispatcher architecture and future development will focus on the current design). Specifying 'dispatcher = thread' is deprecated, but will point to 'dispatcher = process' for the time being.
* Experimental threaded dispatcher `daemons(dispatcher = "thread")` has been retired (as this was based on the old dispatcher architecture and future development will focus on the current design). Specifying 'dispatcher = thread' is defunct, but will point to 'dispatcher = process' for the time being.
* `daemon()` '...' argument had been moved up to prevent partial matching on any of the optional arguments.
* Requires `nanonext` >= [1.3.2.9009].

Expand Down
77 changes: 37 additions & 40 deletions R/daemons.R
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,10 @@
#'
#' @section Dispatcher:
#'
#' By default \code{dispatcher = "process"} launches a background process
#' By default \code{dispatcher = "default"} launches a background process
#' running \code{\link{dispatcher}}. Dispatcher connects to daemons on behalf of
#' the host and ensures optimal FIFO scheduling of tasks.
#'
#' Specifying \code{dispatcher = "next"} launches a background process
#' running \code{\link{dispatcher2}}. This is a newer more efficient
#' architecture and will replace "process" over time.
#'
#' Specifying \code{dispatcher = "none"}, uses the default behaviour without
#' additional dispatcher logic. In this case daemons connect directly to the
#' host and tasks are distributed in a round-robin fashion. Optimal scheduling
Expand Down Expand Up @@ -275,7 +271,7 @@
#'
#' @export
#'
daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "next", "none"),
daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("default", "none"),
..., force = TRUE, seed = NULL, tls = NULL, pass = NULL, .compute = "default") {

missing(n) && missing(url) && return(status(.compute))
Expand All @@ -289,18 +285,24 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "nex
switch(
parse_dispatcher(dispatcher),
{
n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(._[["n_one"]])
url <- url[1L]
tls <- configure_tls(url, tls, pass, envir, returnconfig = FALSE)
cv <- cv()
dots <- parse_dots(...)
output <- attr(dots, "output")
urld <- local_url()
urlc <- sprintf("%s%s", urld, "c")
sock <- req_socket(urld)
sockc <- req_socket(urlc)
res <- launch_sync_dispatcher(sock, sockc, wa5(urld, dots, n, urlc, url), output, tls, pass)
res <- launch_sync_dispatcher(sock, sock, wa52(urld, dots, url), output, tls, pass)
is.object(res) && stop(._[["sync_dispatcher"]])
store_dispatcher(sockc, res, cv, envir)
store_dispatcher(sock, res, cv, envir)
`[[<-`(envir, "msgid", 0L)
n <- 0L
},
{
tls <- configure_tls(url, tls, pass, envir)
sock <- req_socket(url, tls = tls)
check_store_url(sock, envir)
n <- 0L
},
{
n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(._[["n_one"]])
Expand All @@ -309,17 +311,12 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "nex
dots <- parse_dots(...)
output <- attr(dots, "output")
urld <- local_url()
urlc <- sprintf("%s%s", urld, "c")
sock <- req_socket(urld)
res <- launch_sync_dispatcher(sock, sock, wa52(urld, dots, n, url), output, tls, pass)
sockc <- req_socket(urlc)
res <- launch_sync_dispatcher(sock, sockc, wa5(urld, dots, n, urlc, url), output, tls, pass)
is.object(res) && stop(._[["sync_dispatcher"]])
store_dispatcher(sock, res, cv, envir)
`[[<-`(envir, "msgid", 0L)
},
{
tls <- configure_tls(url, tls, pass, envir)
sock <- req_socket(url, tls = tls)
check_store_url(sock, envir)
n <- 0L
store_dispatcher(sockc, res, cv, envir)
},
stop(._[["dispatcher_args"]])
)
Expand Down Expand Up @@ -356,16 +353,6 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "nex
output <- attr(dots, "output")
switch(
parse_dispatcher(dispatcher),
{
cv <- cv()
sock <- req_socket(urld)
urlc <- sprintf("%s%s", urld, "c")
sockc <- req_socket(urlc)
res <- launch_sync_dispatcher(sock, sockc, wa4(urld, dots, envir[["stream"]], n, urlc), output)
is.object(res) && stop(._[["sync_dispatcher"]])
store_dispatcher(sockc, res, cv, envir)
for (i in seq_len(n)) next_stream(envir)
},
{
cv <- cv()
sock <- req_socket(urld)
Expand All @@ -380,6 +367,16 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "nex
launch_sync_daemons(seq_len(n), sock, urld, dots, envir, output) || stop(._[["sync_daemons"]])
`[[<-`(envir, "urls", urld)
},
{
cv <- cv()
sock <- req_socket(urld)
urlc <- sprintf("%s%s", urld, "c")
sockc <- req_socket(urlc)
res <- launch_sync_dispatcher(sock, sockc, wa4(urld, dots, envir[["stream"]], n, urlc), output)
is.object(res) && stop(._[["sync_dispatcher"]])
store_dispatcher(sockc, res, cv, envir)
for (i in seq_len(n)) next_stream(envir)
},
stop(._[["dispatcher_args"]])
)
`[[<-`(.., .compute, `[[<-`(`[[<-`(envir, "sock", sock), "n", n))
Expand Down Expand Up @@ -505,7 +502,7 @@ status <- function(.compute = "default") {
is.list(.compute) && return(status(attr(.compute, "id")))
envir <- ..[[.compute]]
is.null(envir) && return(list(connections = 0L, daemons = 0L))
length(envir[["msgid"]]) && return(dispatcher2_status(envir))
length(envir[["msgid"]]) && return(dispatcher_status(envir))
list(connections = as.integer(stat(envir[["sock"]], "pipes")),
daemons = if (is.null(envir[["sockc"]])) envir[["urls"]] else query_status(envir))

Expand Down Expand Up @@ -587,8 +584,8 @@ req_socket <- function(url, tls = NULL, resend = 0L)

parse_dispatcher <- function(x) {
x <- x[1L]
if (x == "process") 1L else if (x == "next") 2L else if (x == "none") 3L else
if (is.logical(x)) 1L + (!x) * 2L else if (x == "thread") 1L else 4L
if (x == "default") 1L else if (x == "none") 2L else if (x == "process" || x == "thread") 3L else
if (is.logical(x)) 2L + x else 4L
}

parse_dots <- function(...) {
Expand All @@ -614,16 +611,16 @@ wa32 <- function(url, dots, rs, tls = NULL)
shQuote(sprintf("mirai::daemon(\"%s\"%s%s,rs=c(%s),dispatcher=TRUE)", url, dots, parse_tls(tls), paste0(rs, collapse = ",")))

wa4 <- function(urld, dots, rs, n, urlc)
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",n=%d,rs=c(%s),monitor=\"%s\"%s)", libp(), urld, n, paste0(rs, collapse= ","), urlc, dots))
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher_v1(\"%s\",n=%d,rs=c(%s),monitor=\"%s\"%s)", libp(), urld, n, paste0(rs, collapse= ","), urlc, dots))

wa42 <- function(urld, dots, rs, n)
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher2(\"%s\",n=%d,rs=c(%s)%s)", libp(), urld, n, paste0(rs, collapse= ","), dots))
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",n=%d,rs=c(%s)%s)", libp(), urld, n, paste0(rs, collapse= ","), dots))

wa5 <- function(urld, dots, n, urlc, url)
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",c(\"%s\"),n=%d,monitor=\"%s\"%s)", libp(), urld, paste0(url, collapse = "\",\""), n, urlc, dots))
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher_v1(\"%s\",c(\"%s\"),n=%d,monitor=\"%s\"%s)", libp(), urld, paste0(url, collapse = "\",\""), n, urlc, dots))

wa52 <- function(urld, dots, n, url)
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher2(\"%s\",c(\"%s\"),n=%d%s)", libp(), urld, paste0(url, collapse = "\",\""), n, dots))
wa52 <- function(urld, dots, url)
shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",url=\"%s\"%s)", libp(), urld, url, dots))

launch_daemon <- function(args, output)
system2(.command, args = c("-e", args), stdout = output, stderr = output, wait = FALSE)
Expand Down Expand Up @@ -679,10 +676,10 @@ query_status <- function(envir) {
)
}

dispatcher2_status <- function(envir) {
dispatcher_status <- function(envir) {
status <- query_dispatcher(envir[["sock"]], c(0L, 0L))
list(connections = status[1L],
tasks = c(awaiting = status[2L],
mirai = c(awaiting = status[2L],
executing = status[3L],
completed = envir[["msgid"]] - status[2L] - status[3L]))
}
Expand Down
Loading

0 comments on commit 6a9435f

Please sign in to comment.