Skip to content

Commit

Permalink
Merge pull request #170 from shikokuchuo/v2
Browse files Browse the repository at this point in the history
New hybrid dispatcher concept (enables mirai cancellation)
  • Loading branch information
shikokuchuo authored Dec 30, 2024
2 parents 2020f0a + 3e842f1 commit 424fabf
Show file tree
Hide file tree
Showing 41 changed files with 1,175 additions and 962 deletions.
6 changes: 4 additions & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 1.3.1.9000
Version: 1.3.1.9022
Description: Designed for simplicity, a 'mirai' evaluates an R expression
asynchronously in a parallel process, locally or distributed over the
network, with the result automatically available upon completion. Modern
Expand All @@ -23,6 +23,8 @@ Authors@R:
role = "ctb",
email = "[email protected]"),
person(given = "Hibiki AI Limited",
role = "cph"),
person(given = "Posit Software, PBC",
role = "cph"))
License: GPL (>= 3)
BugReports: https://github.com/shikokuchuo/mirai/issues
Expand All @@ -31,7 +33,7 @@ Encoding: UTF-8
Depends:
R (>= 3.6)
Imports:
nanonext (>= 1.3.0)
nanonext (>= 1.4.0)
Enhances:
parallel,
promises
Expand Down
7 changes: 5 additions & 2 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,36 @@ export(serial_config)
export(ssh_config)
export(status)
export(stop_cluster)
export(stop_daemon)
export(stop_mirai)
export(unresolved)
importFrom(nanonext,"opt<-")
importFrom(nanonext,.advance)
importFrom(nanonext,.context)
importFrom(nanonext,.dispatcher)
importFrom(nanonext,.interrupt)
importFrom(nanonext,.keep)
importFrom(nanonext,.mark)
importFrom(nanonext,.online)
importFrom(nanonext,.unresolved)
importFrom(nanonext,call_aio)
importFrom(nanonext,call_aio_)
importFrom(nanonext,collect_aio)
importFrom(nanonext,collect_aio_)
importFrom(nanonext,cv)
importFrom(nanonext,cv_signal)
importFrom(nanonext,cv_value)
importFrom(nanonext,dial)
importFrom(nanonext,is_error_value)
importFrom(nanonext,listen)
importFrom(nanonext,lock)
importFrom(nanonext,mclock)
importFrom(nanonext,monitor)
importFrom(nanonext,msleep)
importFrom(nanonext,nng_error)
importFrom(nanonext,opt)
importFrom(nanonext,parse_url)
importFrom(nanonext,pipe_notify)
importFrom(nanonext,random)
importFrom(nanonext,read_monitor)
importFrom(nanonext,reap)
importFrom(nanonext,recv)
importFrom(nanonext,recv_aio)
Expand Down
26 changes: 25 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,28 @@
# mirai 1.3.1.9000 (development)
# mirai 1.3.1.9022 (development)

#### New Architecture

* Distributed computing now uses a single URL at which all daemons connect (with or without dispatcher).
- Allows a more efficient `tcp://` or `tls+tcp://` connection in most cases instead of websockets.
- Daemons may be added or removed at any time without limit.

#### New Features

* `daemons(dispatcher = TRUE)` provides a new and more efficient architecture for dispatcher. This argument reverts to a logical value, although 'process' is still accepted and retains the previous behaviour of the v1 dispatcher.
* Upgrades `stop_mirai()` to cancel remote mirai tasks when using the new dispatcher, returning a logical value indicating whether cancellation was successful.
* Adds `stop_daemon()` to reduce the number of connected daemons (without interrupting any executing tasks).
* `daemon()` gains the new argument 'dispatcher', which should be set to `TRUE` when connecting to dispatcher and `FALSE` when connecting directly to host.

#### Updates

* `status()` using the new dispatcher is updated to provide more concise information.
* `everywhere()` now returns a list of mirai, which may be waited for and inspected (thanks @dgkf #164).
* `launch_local()` and `launch_remote()` simplified to take the argument 'n' instead of 'url' for how many daemons to launch. `launch_local()` now returns the number of dameons launched rather than invisible NULL.
* `ssh_config()` simplified to take the argument 'port' instead of 'host'. For SSH tunnelling, this is the port that will be used, and the hostname is now required to be '127.0.0.1' (no longer accepting 'localhost').
* `daemon()` '...' argument has been moved up to prevent partial matching on any of the optional arguments.
* `saisei()` is defunct as no longer required, but still available for use with the old v1 dispatcher.
* Experimental threaded dispatcher `daemons(dispatcher = "thread")` has been retired (as this was based on the old dispatcher architecture and future development will focus on the current design). Specifying 'dispatcher = thread' is defunct, but will point to 'dispatcher = process' for the time being.
* Requires `nanonext` >= 1.4.0.

# mirai 1.3.1

Expand Down
185 changes: 179 additions & 6 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,131 @@
#' @param url the character host or dispatcher URL to dial into, including the
#' port to connect to (and optionally for websockets, a path), e.g.
#' 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'.
#' @param ... reserved but not currently used.
#' @param dispatcher [default FALSE] logical value, which should be set to TRUE
#' if using dispatcher and FALSE otherwise.
#' @param asyncdial [default FALSE] whether to perform dials asynchronously. The
#' default FALSE will error if a connection is not immediately possible (for
#' instance if \code{\link{daemons}} has yet to be called on the host, or the
#' specified port is not open etc.). Specifying TRUE continues retrying
#' (indefinitely) if not immediately successful, which is more resilient but
#' can mask potential connection issues.
#' @param autoexit [default TRUE] logical value, whether the daemon should exit
#' automatically when its socket connection ends. If a signal from the
#' \pkg{tools} package, such as \code{tools::SIGINT}, or an equivalent integer
#' value is supplied, this signal is additionally raised on exit (see
#' 'Persistence' section below).
#' @param cleanup [default TRUE] logical value, whether to perform cleanup of
#' the global environment and restore attached packages and options to an
#' initial state after each evaluation.
#' @param output [default FALSE] logical value, to output generated stdout /
#' stderr if TRUE, or else discard if FALSE. Specify as TRUE in the
#' \sQuote{...} argument to \code{\link{daemons}} or
#' \code{\link{launch_local}} to provide redirection of output to the host
#' process (applicable only for local daemons).
#' @param tls [default NULL] required for secure TLS connections over
#' 'tls+tcp://' or 'wss://'. \strong{Either} the character path to a file
#' containing X.509 certificate(s) in PEM format, comprising the certificate
#' authority certificate chain starting with the TLS certificate and ending
#' with the CA certificate, \strong{or} a length 2 character vector comprising
#' [i] the certificate authority certificate chain and [ii] the empty string
#' \code{''}.
#' @param rs [default NULL] the initial value of .Random.seed. This is set
#' automatically using L'Ecuyer-CMRG RNG streams generated by the host process
#' and should not be independently supplied.
#'
#' @return Invisible NULL.
#'
#' @section Persistence:
#'
#' The \sQuote{autoexit} argument governs persistence settings for the daemon.
#' The default TRUE ensures that it will exit cleanly once its socket connection
#' has ended.
#'
#' Instead of TRUE, supplying a signal from the \pkg{tools} package, such as
#' \code{tools::SIGINT}, or an equivalent integer value, sets this signal to be
#' raised when the socket connection ends. For instance, supplying SIGINT allows
#' a potentially more immediate exit by interrupting any ongoing evaluation
#' rather than letting it complete.
#'
#' Setting to FALSE allows the daemon to persist indefinitely even when there is
#' no longer a socket connection. This allows a host session to end and a new
#' session to connect at the URL where the daemon is dialled in. Daemons must be
#' terminated with \code{daemons(NULL)} in this case, which sends explicit exit
#' signals to all connected daemons.
#'
#' @export
#'
daemon <- function(url, dispatcher = FALSE, ..., asyncdial = FALSE, autoexit = TRUE,
cleanup = TRUE, output = FALSE, tls = NULL, rs = NULL) {

missing(dispatcher) && return(
v1_daemon(
url = url, asyncdial = asyncdial, autoexit = autoexit, cleanup = cleanup,
output = output, ..., tls = tls, rs = rs
)
)
cv <- cv()
sock <- socket(protocol = if (dispatcher) "poly" else "rep")
on.exit(reap(sock))
`[[<-`(., "sock", sock)
autoexit && pipe_notify(sock, cv = cv, remove = TRUE, flag = as.integer(autoexit))
if (length(tls)) tls <- tls_config(client = tls)
dial_and_sync_socket(sock, url, asyncdial = asyncdial, tls = tls)

if (is.numeric(rs)) `[[<-`(.GlobalEnv, ".Random.seed", as.integer(rs))
if (!output) {
devnull <- file(nullfile(), open = "w", blocking = FALSE)
sink(file = devnull)
sink(file = devnull, type = "message")
on.exit({
sink(type = "message")
sink()
close(devnull)
}, add = TRUE)
}
snapshot()

if (dispatcher)
repeat {
aio <- recv_aio(sock, mode = 1L, cv = cv)
wait(cv) || break
m <- collect_aio(aio)
is.object(m) && next
cancel <- recv_aio(sock, mode = 8L, cv = NA)
data <- eval_mirai(m)
stop_aio(cancel)
send(sock, data, mode = 1L, block = TRUE)
if (cleanup) do_cleanup()
} else
repeat {
ctx <- .context(sock)
aio <- recv_aio(ctx, mode = 1L, cv = cv)
wait(cv) || break
m <- collect_aio(aio)
data <- eval_mirai(m)
send(ctx, data, mode = 1L, block = TRUE)
if (cleanup) do_cleanup()
}

}

#' Daemon Instance (Legacy v1)
#'
#' Starts up an execution daemon to receive \code{\link{mirai}} requests. Awaits
#' data, evaluates an expression in an environment containing the supplied data,
#' and returns the value to the host caller. Daemon settings may be controlled
#' by \code{\link{daemons}} and this function should not need to be invoked
#' directly, unless deploying manually on remote resources.
#'
#' The network topology is such that daemons dial into the host or dispatcher,
#' which listens at the \sQuote{url} address. In this way, network resources may
#' be added or removed dynamically and the host or dispatcher automatically
#' distributes tasks to all available daemons.
#'
#' @param url the character host or dispatcher URL to dial into, including the
#' port to connect to (and optionally for websockets, a path), e.g.
#' 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'.
#' @param asyncdial [default FALSE] whether to perform dials asynchronously. The
#' default FALSE will error if a connection is not immediately possible (for
#' instance if \code{\link{daemons}} has yet to be called on the host, or the
Expand Down Expand Up @@ -107,11 +232,11 @@
#' Caution: do not reset options but not loaded packages if packages set options
#' on load.
#'
#' @export
#' @noRd
#'
daemon <- function(url, asyncdial = FALSE, autoexit = TRUE, cleanup = TRUE,
output = FALSE, maxtasks = Inf, idletime = Inf, walltime = Inf,
timerstart = 0L, ..., tls = NULL, rs = NULL) {
v1_daemon <- function(url, asyncdial = FALSE, autoexit = TRUE, cleanup = TRUE,
output = FALSE, maxtasks = Inf, idletime = Inf, walltime = Inf,
timerstart = 0L, ..., tls = NULL, rs = NULL) {

cv <- cv()
sock <- socket(protocol = "rep")
Expand Down Expand Up @@ -193,17 +318,59 @@ daemon <- function(url, asyncdial = FALSE, autoexit = TRUE, cleanup = TRUE,

}

#' Stop Daemon
#'
#' Sends exit signals to reduce the number of connected daemons. Does not
#' interrupt any \sQuote{mirai} tasks in execution - exits will happen once
#' these complete (if applicable). This function is only effective when using
#' dispatcher.
#'
#' @param n [default 1L] integer number of daemons to stop.
#' @inheritParams mirai
#'
#' @return Integer number of exit signals sent (the smaller of \sQuote{n} and
#' the number of actually connected daemons).
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
#'
#' daemons(4)
#' status()
#' stop_daemon(2)
#' mirai(TRUE)[]
#' status()
#' daemons(0)
#'
#' }
#'
#' @export
#'
stop_daemon <- function(n = 1L, .compute = "default") {
envir <- if (is.character(.compute)) ..[[.compute]]
length(envir[["msgid"]]) || return(0L)
d <- query_dispatcher(envir[["sock"]], c(0L, 0L))[1L]
stopped <- min(n, d)
for (i in seq_len(stopped))
query_dispatcher(envir[["sock"]], c(0L, -1L))
stopped
}

# internals --------------------------------------------------------------------

handle_mirai_error <- function(e) invokeRestart("mirai_error", e, sys.calls())

handle_mirai_interrupt <- function(e) invokeRestart("mirai_interrupt")

eval_mirai <- function(._mirai_.) {
list2env(._mirai_.[["._mirai_globals_."]], envir = .GlobalEnv)
withRestarts(
withCallingHandlers(
eval(._mirai_.[[".expr"]], envir = ._mirai_., enclos = .GlobalEnv),
{
on.exit(.interrupt(FALSE))
.interrupt()
list2env(._mirai_.[["._mirai_globals_."]], envir = .GlobalEnv)
eval(._mirai_.[[".expr"]], envir = ._mirai_., enclos = .GlobalEnv)
},
error = handle_mirai_error,
interrupt = handle_mirai_interrupt
),
Expand Down Expand Up @@ -232,5 +399,11 @@ perform_cleanup <- function(cleanup) {
if (cleanup[4L]) gc(verbose = FALSE)
}

do_cleanup <- function() {
rm(list = (vars <- names(.GlobalEnv))[!vars %in% .[["vars"]]], envir = .GlobalEnv)
lapply((new <- search())[!new %in% .[["se"]]], detach, character.only = TRUE)
options(.[["op"]])
}

register <- function(x) `opt<-`(.[["sock"]], "serial", x)
snapshot <- function() `[[<-`(`[[<-`(`[[<-`(., "op", .Options), "se", search()), "vars", names(.GlobalEnv))
Loading

0 comments on commit 424fabf

Please sign in to comment.