From 6a9435faa623d1f4f669c336ac8ca4b26b3566ec Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Sun, 1 Dec 2024 12:24:47 +0000 Subject: [PATCH] make new dispatcher the default --- DESCRIPTION | 2 +- NAMESPACE | 2 +- NEWS.md | 9 +- R/daemons.R | 77 +++++----- R/dispatcher.R | 359 ++++++++++++++++++++++--------------------- R/mirai-package.R | 2 +- man/daemons.Rd | 8 +- man/dispatcher.Rd | 62 ++------ man/dispatcher_v1.Rd | 73 +++++++++ man/saisei.Rd | 1 + tests/tests.R | 106 ++++++------- 11 files changed, 356 insertions(+), 345 deletions(-) create mode 100644 man/dispatcher_v1.Rd diff --git a/DESCRIPTION b/DESCRIPTION index 52f23585..cb70e42d 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,7 +1,7 @@ Package: mirai Type: Package Title: Minimalist Async Evaluation Framework for R -Version: 1.3.1.9009 +Version: 1.3.1.9010 Description: Designed for simplicity, a 'mirai' evaluates an R expression asynchronously in a parallel process, locally or distributed over the network, with the result automatically available upon completion. Modern diff --git a/NAMESPACE b/NAMESPACE index 891d5878..5b665f2b 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -30,7 +30,7 @@ export(collect_mirai) export(daemon) export(daemons) export(dispatcher) -export(dispatcher2) +export(dispatcher_v1) export(everywhere) export(host_url) export(is_error_value) diff --git a/NEWS.md b/NEWS.md index 48d033d5..57c5f9f6 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,13 +1,14 @@ -# mirai 1.3.1.9009 (development) +# mirai 1.3.1.9010 (development) #### New Features -* Introduces `daemons(dispatcher = "next")`, a newer and more efficient dispatcher. Supports mirai cancellation. -* `daemon()` gains the new argument 'dispatcher', which should be set to `TRUE` when using dispatcher and `FALSE` otherwise. +* `daemons(dispatcher = "default")` provides a new and more efficient architecture for dispatcher. Although 'process' is no longer an option, this will still work and retains the previous behaviour of the v1 dispatcher. +* `stop_mirai()` is upgraded to cancel remote mirai tasks when using the new dispatcher, returning a logical value indicating whether cancellation was successful. +* `daemon()` gains the new argument 'dispatcher', which should be set to `TRUE` when connecting to dispatcher and `FALSE` when connecting directly to host. #### Updates -* Experimental threaded dispatcher `daemons(dispatcher = "thread")` has been retired (as this was based on the old dispatcher architecture and future development will focus on the current design). Specifying 'dispatcher = thread' is deprecated, but will point to 'dispatcher = process' for the time being. +* Experimental threaded dispatcher `daemons(dispatcher = "thread")` has been retired (as this was based on the old dispatcher architecture and future development will focus on the current design). Specifying 'dispatcher = thread' is defunct, but will point to 'dispatcher = process' for the time being. * `daemon()` '...' argument had been moved up to prevent partial matching on any of the optional arguments. * Requires `nanonext` >= [1.3.2.9009]. diff --git a/R/daemons.R b/R/daemons.R index 08b19ce5..1c7ef9cd 100644 --- a/R/daemons.R +++ b/R/daemons.R @@ -108,14 +108,10 @@ #' #' @section Dispatcher: #' -#' By default \code{dispatcher = "process"} launches a background process +#' By default \code{dispatcher = "default"} launches a background process #' running \code{\link{dispatcher}}. Dispatcher connects to daemons on behalf of #' the host and ensures optimal FIFO scheduling of tasks. #' -#' Specifying \code{dispatcher = "next"} launches a background process -#' running \code{\link{dispatcher2}}. This is a newer more efficient -#' architecture and will replace "process" over time. -#' #' Specifying \code{dispatcher = "none"}, uses the default behaviour without #' additional dispatcher logic. In this case daemons connect directly to the #' host and tasks are distributed in a round-robin fashion. Optimal scheduling @@ -275,7 +271,7 @@ #' #' @export #' -daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "next", "none"), +daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("default", "none"), ..., force = TRUE, seed = NULL, tls = NULL, pass = NULL, .compute = "default") { missing(n) && missing(url) && return(status(.compute)) @@ -289,18 +285,24 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "nex switch( parse_dispatcher(dispatcher), { - n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(._[["n_one"]]) + url <- url[1L] tls <- configure_tls(url, tls, pass, envir, returnconfig = FALSE) cv <- cv() dots <- parse_dots(...) output <- attr(dots, "output") urld <- local_url() - urlc <- sprintf("%s%s", urld, "c") sock <- req_socket(urld) - sockc <- req_socket(urlc) - res <- launch_sync_dispatcher(sock, sockc, wa5(urld, dots, n, urlc, url), output, tls, pass) + res <- launch_sync_dispatcher(sock, sock, wa52(urld, dots, url), output, tls, pass) is.object(res) && stop(._[["sync_dispatcher"]]) - store_dispatcher(sockc, res, cv, envir) + store_dispatcher(sock, res, cv, envir) + `[[<-`(envir, "msgid", 0L) + n <- 0L + }, + { + tls <- configure_tls(url, tls, pass, envir) + sock <- req_socket(url, tls = tls) + check_store_url(sock, envir) + n <- 0L }, { n <- if (missing(n)) length(url) else if (is.numeric(n) && n >= 1L) as.integer(n) else stop(._[["n_one"]]) @@ -309,17 +311,12 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "nex dots <- parse_dots(...) output <- attr(dots, "output") urld <- local_url() + urlc <- sprintf("%s%s", urld, "c") sock <- req_socket(urld) - res <- launch_sync_dispatcher(sock, sock, wa52(urld, dots, n, url), output, tls, pass) + sockc <- req_socket(urlc) + res <- launch_sync_dispatcher(sock, sockc, wa5(urld, dots, n, urlc, url), output, tls, pass) is.object(res) && stop(._[["sync_dispatcher"]]) - store_dispatcher(sock, res, cv, envir) - `[[<-`(envir, "msgid", 0L) - }, - { - tls <- configure_tls(url, tls, pass, envir) - sock <- req_socket(url, tls = tls) - check_store_url(sock, envir) - n <- 0L + store_dispatcher(sockc, res, cv, envir) }, stop(._[["dispatcher_args"]]) ) @@ -356,16 +353,6 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "nex output <- attr(dots, "output") switch( parse_dispatcher(dispatcher), - { - cv <- cv() - sock <- req_socket(urld) - urlc <- sprintf("%s%s", urld, "c") - sockc <- req_socket(urlc) - res <- launch_sync_dispatcher(sock, sockc, wa4(urld, dots, envir[["stream"]], n, urlc), output) - is.object(res) && stop(._[["sync_dispatcher"]]) - store_dispatcher(sockc, res, cv, envir) - for (i in seq_len(n)) next_stream(envir) - }, { cv <- cv() sock <- req_socket(urld) @@ -380,6 +367,16 @@ daemons <- function(n, url = NULL, remote = NULL, dispatcher = c("process", "nex launch_sync_daemons(seq_len(n), sock, urld, dots, envir, output) || stop(._[["sync_daemons"]]) `[[<-`(envir, "urls", urld) }, + { + cv <- cv() + sock <- req_socket(urld) + urlc <- sprintf("%s%s", urld, "c") + sockc <- req_socket(urlc) + res <- launch_sync_dispatcher(sock, sockc, wa4(urld, dots, envir[["stream"]], n, urlc), output) + is.object(res) && stop(._[["sync_dispatcher"]]) + store_dispatcher(sockc, res, cv, envir) + for (i in seq_len(n)) next_stream(envir) + }, stop(._[["dispatcher_args"]]) ) `[[<-`(.., .compute, `[[<-`(`[[<-`(envir, "sock", sock), "n", n)) @@ -505,7 +502,7 @@ status <- function(.compute = "default") { is.list(.compute) && return(status(attr(.compute, "id"))) envir <- ..[[.compute]] is.null(envir) && return(list(connections = 0L, daemons = 0L)) - length(envir[["msgid"]]) && return(dispatcher2_status(envir)) + length(envir[["msgid"]]) && return(dispatcher_status(envir)) list(connections = as.integer(stat(envir[["sock"]], "pipes")), daemons = if (is.null(envir[["sockc"]])) envir[["urls"]] else query_status(envir)) @@ -587,8 +584,8 @@ req_socket <- function(url, tls = NULL, resend = 0L) parse_dispatcher <- function(x) { x <- x[1L] - if (x == "process") 1L else if (x == "next") 2L else if (x == "none") 3L else - if (is.logical(x)) 1L + (!x) * 2L else if (x == "thread") 1L else 4L + if (x == "default") 1L else if (x == "none") 2L else if (x == "process" || x == "thread") 3L else + if (is.logical(x)) 2L + x else 4L } parse_dots <- function(...) { @@ -614,16 +611,16 @@ wa32 <- function(url, dots, rs, tls = NULL) shQuote(sprintf("mirai::daemon(\"%s\"%s%s,rs=c(%s),dispatcher=TRUE)", url, dots, parse_tls(tls), paste0(rs, collapse = ","))) wa4 <- function(urld, dots, rs, n, urlc) - shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",n=%d,rs=c(%s),monitor=\"%s\"%s)", libp(), urld, n, paste0(rs, collapse= ","), urlc, dots)) + shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher_v1(\"%s\",n=%d,rs=c(%s),monitor=\"%s\"%s)", libp(), urld, n, paste0(rs, collapse= ","), urlc, dots)) wa42 <- function(urld, dots, rs, n) - shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher2(\"%s\",n=%d,rs=c(%s)%s)", libp(), urld, n, paste0(rs, collapse= ","), dots)) + shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",n=%d,rs=c(%s)%s)", libp(), urld, n, paste0(rs, collapse= ","), dots)) wa5 <- function(urld, dots, n, urlc, url) - shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",c(\"%s\"),n=%d,monitor=\"%s\"%s)", libp(), urld, paste0(url, collapse = "\",\""), n, urlc, dots)) + shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher_v1(\"%s\",c(\"%s\"),n=%d,monitor=\"%s\"%s)", libp(), urld, paste0(url, collapse = "\",\""), n, urlc, dots)) -wa52 <- function(urld, dots, n, url) - shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher2(\"%s\",c(\"%s\"),n=%d%s)", libp(), urld, paste0(url, collapse = "\",\""), n, dots)) +wa52 <- function(urld, dots, url) + shQuote(sprintf(".libPaths(c(\"%s\",.libPaths()));mirai::dispatcher(\"%s\",url=\"%s\"%s)", libp(), urld, url, dots)) launch_daemon <- function(args, output) system2(.command, args = c("-e", args), stdout = output, stderr = output, wait = FALSE) @@ -679,10 +676,10 @@ query_status <- function(envir) { ) } -dispatcher2_status <- function(envir) { +dispatcher_status <- function(envir) { status <- query_dispatcher(envir[["sock"]], c(0L, 0L)) list(connections = status[1L], - tasks = c(awaiting = status[2L], + mirai = c(awaiting = status[2L], executing = status[3L], completed = envir[["msgid"]] - status[2L] - status[3L])) } diff --git a/R/dispatcher.R b/R/dispatcher.R index d4cb6996..502b9a42 100644 --- a/R/dispatcher.R +++ b/R/dispatcher.R @@ -29,26 +29,187 @@ #' tasks are only sent to daemons that can begin immediate execution of the #' task. #' -#' \code{dispatcher2} is the type \sQuote{next} dispatcher for daemons. -#' #' @inheritParams daemon #' @param host the character host URL to dial (where tasks are sent from), #' including the port to connect to (and optionally for websockets, a path), #' e.g. 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'. -#' @param url (optional) the character URL or vector of URLs dispatcher should -#' listen at, including the port to connect to (and optionally for websockets, -#' a path), e.g. 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'. -#' Specify 'tls+tcp://' or 'wss://' to use secure TLS connections. Tasks are -#' sent to daemons dialled into these URLs. If not supplied, \sQuote{n} local -#' inter-process URLs will be assigned automatically. -#' @param n (optional) if specified, the integer number of daemons to listen for. -#' Otherwise \sQuote{n} will be inferred from the number of URLs supplied in -#' \sQuote{url}. Where a single URL is supplied and \sQuote{n} > 1, \sQuote{n} -#' unique URLs will be automatically assigned for daemons to dial into. +#' @param url (optional) the character URL dispatcher should listen at, +#' including the port to connect to (and optionally for websockets, a path), +#' e.g. 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'. +#' Specify 'tls+tcp://' or 'wss://' to use secure TLS connections. Daemons +#' should dial in to this URL. +#' @param n (optional) if specified, the integer number of daemons to launch. In +#' this case, a local url is automatically generated. #' @param ... (optional) additional arguments passed through to -#' \code{\link{daemon}}. These include \sQuote{asyncdial}, \sQuote{autoexit}, -#' \sQuote{cleanup}, \sQuote{maxtasks}, \sQuote{idletime}, \sQuote{walltime} -#' and \sQuote{timerstart}. +#' \code{\link{daemon}}. These include \sQuote{asyncdial}, \sQuote{autoexit}, +#' and \sQuote{cleanup}. +#' @param tls [default NULL] (required for secure TLS connections) +#' \strong{either} the character path to a file containing the PEM-encoded TLS +#' certificate and associated private key (may contain additional certificates +#' leading to a validation chain, with the TLS certificate first), \strong{or} +#' a length 2 character vector comprising [i] the TLS certificate (optionally +#' certificate chain) and [ii] the associated private key. +#' @param pass [default NULL] (required only if the private key supplied to +#' \sQuote{tls} is encrypted with a password) For security, should be provided +#' through a function that returns this value, rather than directly. +#' +#' @return Invisible NULL. +#' +#' @export +#' +dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL, rs = NULL) { + + n <- if (is.numeric(n)) as.integer(n) else length(url) + n > 0L || stop(._[["missing_url"]]) + + cv <- cv() + sock <- socket(protocol = "rep") + on.exit(reap(sock)) + pipe_notify(sock, cv = cv, remove = TRUE, flag = TRUE) + dial_and_sync_socket(sock, host) + + ctx <- .context(sock) + cmessage <- recv(ctx, mode = 2L, block = .limit_long) + is.object(cmessage) && stop(._[["sync_dispatcher"]]) + if (nzchar(cmessage[2L])) + Sys.setenv(R_DEFAULT_PACKAGES = cmessage[2L]) else + Sys.unsetenv("R_DEFAULT_PACKAGES") + + auto <- is.null(url) + if (auto) url <- local_url() + psock <- socket(protocol = "poly") + on.exit(reap(psock), add = TRUE, after = TRUE) + m <- monitor(psock, cv) + listen(psock, url = url, tls = tls, error = TRUE) + + msgid <- 0L + inq <- outq <- list() + + if (auto) { + + envir <- new.env(hash = FALSE) + if (is.numeric(rs)) `[[<-`(envir, "stream", as.integer(rs)) + dots <- parse_dots(...) + output <- attr(dots, "output") + for (i in seq_len(n)) + launch_daemon(wa32(url, dots, next_stream(envir)), output) + for (i in seq_len(n)) + until(cv, .limit_long) || stop(._[["sync_daemons"]]) + + changes <- read_monitor(m) + for (item in changes) + outq[[as.character(item)]] <- if (item > 0) list(pipe = item, msgid = 0L, ctx = NULL) + + } else { + + url <- check_url(psock) + if (nzchar(cmessage[4L]) && is.null(tls)) { + tls <- c(cmessage[4L], if (nzchar(cmessage[6L])) cmessage[6L]) + pass <- if (nzchar(cmessage[8L])) cmessage[8L] + } + if (length(tls)) + tls <- tls_config(server = tls, pass = pass) + + } + + pass <- NULL + send(ctx, c(Sys.getpid(), url), mode = 2L, block = TRUE) + + ctx <- .context(sock) + req <- recv_aio(ctx, mode = 8L, cv = cv) + res <- recv_aio(psock, mode = 8L, cv = cv) + + suspendInterrupts( + repeat { + + wait(cv) || break + + changes <- read_monitor(m) + if (length(changes)) { + for (item in changes) { + if (item > 0) { + outq[[as.character(item)]] <- list(pipe = item, msgid = 0L, ctx = NULL) + } else { + id <- as.character(-item) + if (outq[[id]][["msgid"]]) + send(outq[[id]][["ctx"]], .connectionReset, mode = 1L, block = TRUE) + outq[[id]] <- NULL + } + } + next + } + + if (!.unresolved(req)) { + value <- collect_aio(req) + + if (value[1L] == 0L) { + id <- readBin(value, "integer", n = 2L)[2L] + if (id) { + found <- FALSE + for (i in seq_along(outq)) + if (outq[[i]][["msgid"]] == id) { + send(psock, .miraiInterrupt, mode = 1L, pipe = outq[[i]][["pipe"]], block = TRUE) + outq[[i]][["msgid"]] <- 0L + found <- TRUE + break + } + if (found) { + found <- NA + } else { + for (i in seq_along(inq)) + if (inq[[i]][["msgid"]] == id) { + inq[[i]] <- NULL + found <- TRUE + break + } + } + send(ctx, found, mode = 2L, block = TRUE) + } else { + status <- c( + length(outq), + length(inq), + sum(as.logical(unlist(lapply(outq, .subset2, "msgid"), use.names = FALSE))) + ) + send(ctx, status, mode = 2L, block = TRUE) + } + ctx <- .context(sock) + req <- recv_aio(ctx, mode = 8L, cv = cv) + next + } + + msgid <- msgid + 1L + inq[[length(inq) + 1L]] <- list(ctx = ctx, req = value, msgid = msgid) + ctx <- .context(sock) + req <- recv_aio(ctx, mode = 8L, cv = cv) + + } else if (!.unresolved(res)) { + value <- collect_aio(res) + id <- as.character(.subset2(res, "aio")) + res <- recv_aio(psock, mode = 8L, cv = cv) + send(outq[[id]][["ctx"]], value, mode = 2L, block = TRUE) + outq[[id]][["msgid"]] <- 0L + } + + if (length(inq)) + for (i in seq_along(outq)) + if (!outq[[i]][["msgid"]]) { + send(psock, inq[[1L]][["req"]], mode = 2L, pipe = outq[[i]][["pipe"]], block = TRUE) + outq[[i]][["ctx"]] <- inq[[1L]][["ctx"]] + outq[[i]][["msgid"]] <- inq[[1L]][["msgid"]] + inq[[1L]] <- NULL + break + } + + } + ) + +} + +#' Dispatcher (v1) +#' +#' This is a deprecated legacy function only present to support existing uses. +#' +#' @inheritParams dispatcher #' @param retry [default FALSE] logical value, whether to automatically retry #' tasks where the daemon crashes or terminates unexpectedly on the next #' daemon instance to connect. If TRUE, the mirai will remain unresolved but @@ -59,23 +220,15 @@ #' @param token [default FALSE] if TRUE, appends a unique 24-character token to #' each URL path the dispatcher listens at (not applicable for TCP URLs which #' do not accept a path). -#' @param tls [default NULL] (required for secure TLS connections) -#' \strong{either} the character path to a file containing the PEM-encoded TLS -#' certificate and associated private key (may contain additional certificates -#' leading to a validation chain, with the TLS certificate first), \strong{or} -#' a length 2 character vector comprising [i] the TLS certificate (optionally -#' certificate chain) and [ii] the associated private key. -#' @param pass [default NULL] (required only if the private key supplied to -#' \sQuote{tls} is encrypted with a password) For security, should be provided -#' through a function that returns this value, rather than directly. #' @param monitor (for package internal use only) do not set this parameter. #' -#' @return Invisible NULL. +#' @return The regenerated character URL upon success, or else NULL. #' +#' @keywords internal #' @export #' -dispatcher <- function(host, url = NULL, n = NULL, ..., retry = FALSE, token = FALSE, - tls = NULL, pass = NULL, rs = NULL, monitor = NULL) { +dispatcher_v1 <- function(host, url = NULL, n = NULL, ..., retry = FALSE, token = FALSE, + tls = NULL, pass = NULL, rs = NULL, monitor = NULL) { n <- if (is.numeric(n)) as.integer(n) else length(url) n > 0L || stop(._[["missing_url"]]) @@ -250,157 +403,6 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., retry = FALSE, token = F } -#' @rdname dispatcher -#' @export -#' -dispatcher2 <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL, rs = NULL) { - - n <- if (is.numeric(n)) as.integer(n) else length(url) - n > 0L || stop(._[["missing_url"]]) - - cv <- cv() - sock <- socket(protocol = "rep") - on.exit(reap(sock)) - pipe_notify(sock, cv = cv, remove = TRUE, flag = TRUE) - dial_and_sync_socket(sock, host) - - ctx <- .context(sock) - cmessage <- recv(ctx, mode = 2L, block = .limit_long) - is.object(cmessage) && stop(._[["sync_dispatcher"]]) - if (nzchar(cmessage[2L])) - Sys.setenv(R_DEFAULT_PACKAGES = cmessage[2L]) else - Sys.unsetenv("R_DEFAULT_PACKAGES") - - auto <- is.null(url) - if (auto) url <- local_url() - psock <- socket(protocol = "poly") - on.exit(reap(psock), add = TRUE, after = TRUE) - m <- monitor(psock, cv) - listen(psock, url = url, tls = tls, error = TRUE) - - msgid <- 0L - inq <- outq <- list() - - if (auto) { - - envir <- new.env(hash = FALSE) - if (is.numeric(rs)) `[[<-`(envir, "stream", as.integer(rs)) - dots <- parse_dots(...) - output <- attr(dots, "output") - for (i in seq_len(n)) - launch_daemon(wa32(url, dots, next_stream(envir)), output) - for (i in seq_len(n)) - until(cv, .limit_long) || stop(._[["sync_daemons"]]) - - changes <- read_monitor(m) - for (item in changes) - outq[[as.character(item)]] <- if (item > 0) list(pipe = item, msgid = 0L, ctx = NULL) - - } else { - - url <- check_url(psock) - if (nzchar(cmessage[4L]) && is.null(tls)) { - tls <- c(cmessage[4L], if (nzchar(cmessage[6L])) cmessage[6L]) - pass <- if (nzchar(cmessage[8L])) cmessage[8L] - } - if (length(tls)) - tls <- tls_config(server = tls, pass = pass) - - } - - pass <- NULL - send(ctx, c(Sys.getpid(), url), mode = 2L, block = TRUE) - - ctx <- .context(sock) - req <- recv_aio(ctx, mode = 8L, cv = cv) - res <- recv_aio(psock, mode = 8L, cv = cv) - - suspendInterrupts( - repeat { - - wait(cv) || break - - changes <- read_monitor(m) - if (length(changes)) { - for (item in changes) { - if (item > 0) { - outq[[as.character(item)]] <- list(pipe = item, msgid = 0L, ctx = NULL) - } else { - id <- as.character(-item) - if (outq[[id]][["msgid"]]) - send(outq[[id]][["ctx"]], .connectionReset, mode = 1L, block = TRUE) - outq[[id]] <- NULL - } - } - next - } - - if (!.unresolved(req)) { - value <- collect_aio(req) - - if (value[1L] == 0L) { - id <- readBin(value, "integer", n = 2L)[2L] - if (id) { - found <- FALSE - for (i in seq_along(outq)) - if (outq[[i]][["msgid"]] == id) { - send(psock, .miraiInterrupt, mode = 1L, pipe = outq[[i]][["pipe"]], block = TRUE) - outq[[i]][["msgid"]] <- 0L - found <- TRUE - break - } - if (found) { - found <- NA - } else { - for (i in seq_along(inq)) - if (inq[[i]][["msgid"]] == id) { - inq[[i]] <- NULL - found <- TRUE - break - } - } - send(ctx, found, mode = 2L, block = TRUE) - } else { - status <- c( - length(outq), - length(inq), - sum(as.logical(unlist(lapply(outq, .subset2, "msgid"), use.names = FALSE))) - ) - send(ctx, status, mode = 2L, block = TRUE) - } - ctx <- .context(sock) - req <- recv_aio(ctx, mode = 8L, cv = cv) - next - } - - msgid <- msgid + 1L - inq[[length(inq) + 1L]] <- list(ctx = ctx, req = value, msgid = msgid) - ctx <- .context(sock) - req <- recv_aio(ctx, mode = 8L, cv = cv) - - } else if (!.unresolved(res)) { - value <- collect_aio(res) - id <- as.character(.subset2(res, "aio")) - res <- recv_aio(psock, mode = 8L, cv = cv) - send(outq[[id]][["ctx"]], value, mode = 2L, block = TRUE) - outq[[id]][["msgid"]] <- 0L - } - - if (length(inq)) - for (i in seq_along(outq)) - if (!outq[[i]][["msgid"]]) { - send(psock, inq[[1L]][["req"]], mode = 2L, pipe = outq[[i]][["pipe"]], block = TRUE) - outq[[i]][["ctx"]] <- inq[[1L]][["ctx"]] - outq[[i]][["msgid"]] <- inq[[1L]][["msgid"]] - inq[[1L]] <- NULL - break - } - - } - ) - -} - #' Saisei (Regenerate Token) #' #' When using daemons with dispatcher, regenerates the token for the URL a @@ -452,6 +454,7 @@ dispatcher2 <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL #' #' } #' +#' @keywords internal #' @export #' saisei <- function(i, force = FALSE, .compute = "default") { diff --git a/R/mirai-package.R b/R/mirai-package.R index 717442a8..a53691f0 100644 --- a/R/mirai-package.R +++ b/R/mirai-package.R @@ -89,7 +89,7 @@ cluster_inactive = "cluster is no longer active", correct_context = "'host' must be specified if not using directly in a function argument", daemons_unset = "daemons must be set to use launchers", - dispatcher_args = "'dispatcher' must be one of 'process', 'thread' or 'none'", + dispatcher_args = "'dispatcher' must be either 'default' or 'none'", dot_required = "'.' must be an element of the character vector(s) supplied to 'args'", function_required = "'.f' must be of type function, not %s", missing_expression = "missing expression, perhaps wrap in {}?", diff --git a/man/daemons.Rd b/man/daemons.Rd index 0c378b56..0589804e 100644 --- a/man/daemons.Rd +++ b/man/daemons.Rd @@ -8,7 +8,7 @@ daemons( n, url = NULL, remote = NULL, - dispatcher = c("process", "next", "none"), + dispatcher = c("default", "none"), ..., force = TRUE, seed = NULL, @@ -126,14 +126,10 @@ connecting back to the host process, either directly or via dispatcher. \section{Dispatcher}{ -By default \code{dispatcher = "process"} launches a background process +By default \code{dispatcher = "default"} launches a background process running \code{\link{dispatcher}}. Dispatcher connects to daemons on behalf of the host and ensures optimal FIFO scheduling of tasks. -Specifying \code{dispatcher = "next"} launches a background process -running \code{\link{dispatcher2}}. This is a newer more efficient -architecture and will replace "process" over time. - Specifying \code{dispatcher = "none"}, uses the default behaviour without additional dispatcher logic. In this case daemons connect directly to the host and tasks are distributed in a round-robin fashion. Optimal scheduling diff --git a/man/dispatcher.Rd b/man/dispatcher.Rd index b0226541..92650b9f 100644 --- a/man/dispatcher.Rd +++ b/man/dispatcher.Rd @@ -2,65 +2,27 @@ % Please edit documentation in R/dispatcher.R \name{dispatcher} \alias{dispatcher} -\alias{dispatcher2} \title{Dispatcher} \usage{ -dispatcher( - host, - url = NULL, - n = NULL, - ..., - retry = FALSE, - token = FALSE, - tls = NULL, - pass = NULL, - rs = NULL, - monitor = NULL -) - -dispatcher2( - host, - url = NULL, - n = NULL, - ..., - tls = NULL, - pass = NULL, - rs = NULL -) +dispatcher(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL, rs = NULL) } \arguments{ \item{host}{the character host URL to dial (where tasks are sent from), including the port to connect to (and optionally for websockets, a path), e.g. 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'.} -\item{url}{(optional) the character URL or vector of URLs dispatcher should -listen at, including the port to connect to (and optionally for websockets, -a path), e.g. 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'. -Specify 'tls+tcp://' or 'wss://' to use secure TLS connections. Tasks are -sent to daemons dialled into these URLs. If not supplied, \sQuote{n} local -inter-process URLs will be assigned automatically.} +\item{url}{(optional) the character URL dispatcher should listen at, +including the port to connect to (and optionally for websockets, a path), +e.g. 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'. +Specify 'tls+tcp://' or 'wss://' to use secure TLS connections. Daemons +should dial in to this URL.} -\item{n}{(optional) if specified, the integer number of daemons to listen for. -Otherwise \sQuote{n} will be inferred from the number of URLs supplied in -\sQuote{url}. Where a single URL is supplied and \sQuote{n} > 1, \sQuote{n} -unique URLs will be automatically assigned for daemons to dial into.} +\item{n}{(optional) if specified, the integer number of daemons to launch. In +this case, a local url is automatically generated.} \item{...}{(optional) additional arguments passed through to -\code{\link{daemon}}. These include \sQuote{asyncdial}, \sQuote{autoexit}, -\sQuote{cleanup}, \sQuote{maxtasks}, \sQuote{idletime}, \sQuote{walltime} -and \sQuote{timerstart}.} - -\item{retry}{[default FALSE] logical value, whether to automatically retry -tasks where the daemon crashes or terminates unexpectedly on the next -daemon instance to connect. If TRUE, the mirai will remain unresolved but -\code{\link{status}} will show \sQuote{online} as 0 and \sQuote{assigned} > -\sQuote{complete}. To cancel a task in this case, use -\code{saisei(force = TRUE)}. If FALSE, such tasks will be returned as -\sQuote{errorValue} 19 (Connection reset).} - -\item{token}{[default FALSE] if TRUE, appends a unique 24-character token to -each URL path the dispatcher listens at (not applicable for TCP URLs which -do not accept a path).} +\code{\link{daemon}}. These include \sQuote{asyncdial}, \sQuote{autoexit}, +and \sQuote{cleanup}.} \item{tls}{[default NULL] (required for secure TLS connections) \strong{either} the character path to a file containing the PEM-encoded TLS @@ -76,8 +38,6 @@ through a function that returns this value, rather than directly.} \item{rs}{[default NULL] the initial value of .Random.seed. This is set automatically using L'Ecuyer-CMRG RNG streams generated by the host process and should not be independently supplied.} - -\item{monitor}{(for package internal use only) do not set this parameter.} } \value{ Invisible NULL. @@ -94,6 +54,4 @@ host and daemons, ensuring that tasks received from the host are dispatched on a FIFO basis for processing. Tasks are queued at the dispatcher to ensure tasks are only sent to daemons that can begin immediate execution of the task. - -\code{dispatcher2} is the type \sQuote{next} dispatcher for daemons. } diff --git a/man/dispatcher_v1.Rd b/man/dispatcher_v1.Rd new file mode 100644 index 00000000..b6eb1336 --- /dev/null +++ b/man/dispatcher_v1.Rd @@ -0,0 +1,73 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dispatcher.R +\name{dispatcher_v1} +\alias{dispatcher_v1} +\title{Dispatcher (v1)} +\usage{ +dispatcher_v1( + host, + url = NULL, + n = NULL, + ..., + retry = FALSE, + token = FALSE, + tls = NULL, + pass = NULL, + rs = NULL, + monitor = NULL +) +} +\arguments{ +\item{host}{the character host URL to dial (where tasks are sent from), +including the port to connect to (and optionally for websockets, a path), +e.g. 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'.} + +\item{url}{(optional) the character URL dispatcher should listen at, +including the port to connect to (and optionally for websockets, a path), +e.g. 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'. +Specify 'tls+tcp://' or 'wss://' to use secure TLS connections. Daemons +should dial in to this URL.} + +\item{n}{(optional) if specified, the integer number of daemons to launch. In +this case, a local url is automatically generated.} + +\item{...}{(optional) additional arguments passed through to +\code{\link{daemon}}. These include \sQuote{asyncdial}, \sQuote{autoexit}, +and \sQuote{cleanup}.} + +\item{retry}{[default FALSE] logical value, whether to automatically retry +tasks where the daemon crashes or terminates unexpectedly on the next +daemon instance to connect. If TRUE, the mirai will remain unresolved but +\code{\link{status}} will show \sQuote{online} as 0 and \sQuote{assigned} > +\sQuote{complete}. To cancel a task in this case, use +\code{saisei(force = TRUE)}. If FALSE, such tasks will be returned as +\sQuote{errorValue} 19 (Connection reset).} + +\item{token}{[default FALSE] if TRUE, appends a unique 24-character token to +each URL path the dispatcher listens at (not applicable for TCP URLs which +do not accept a path).} + +\item{tls}{[default NULL] (required for secure TLS connections) +\strong{either} the character path to a file containing the PEM-encoded TLS +certificate and associated private key (may contain additional certificates +leading to a validation chain, with the TLS certificate first), \strong{or} +a length 2 character vector comprising [i] the TLS certificate (optionally +certificate chain) and [ii] the associated private key.} + +\item{pass}{[default NULL] (required only if the private key supplied to +\sQuote{tls} is encrypted with a password) For security, should be provided +through a function that returns this value, rather than directly.} + +\item{rs}{[default NULL] the initial value of .Random.seed. This is set +automatically using L'Ecuyer-CMRG RNG streams generated by the host process +and should not be independently supplied.} + +\item{monitor}{(for package internal use only) do not set this parameter.} +} +\value{ +The regenerated character URL upon success, or else NULL. +} +\description{ +This is a deprecated legacy function only present to support existing uses. +} +\keyword{internal} diff --git a/man/saisei.Rd b/man/saisei.Rd index 7c973e24..bf9efefb 100644 --- a/man/saisei.Rd +++ b/man/saisei.Rd @@ -65,3 +65,4 @@ daemons(0) } } +\keyword{internal} diff --git a/tests/tests.R b/tests/tests.R index 51e4a423..89d92b56 100644 --- a/tests/tests.R +++ b/tests/tests.R @@ -24,10 +24,9 @@ test_error(mirai(a, .args = list(1)), "all items in '.args' must be named") test_error(mirai_map(1:2, "a function"), "must be of type function, not character") test_error(daemons(url = "URL"), "Invalid argument") test_error(daemons(-1), "zero or greater") -test_error(daemons(n = 0, url = "ws://localhost:0"), "1 or greater") test_error(daemons(raw(0L)), "must be numeric") -test_error(daemons(1, dispatcher = "p"), "must be one of") -test_error(daemons(url = local_url(), dispatcher = "t"), "must be one of") +test_error(daemons(1, dispatcher = "p"), "must be either") +test_error(daemons(url = local_url(), dispatcher = "t"), "must be either") test_error(dispatcher(client = "URL"), "at least one") test_error(daemon("URL"), "Invalid argument") test_error(launch_local(1L), "daemons must be set") @@ -220,50 +219,30 @@ connection && .Platform[["OS.type"]] != "windows" && Sys.getenv("NOT_CRAN") == " } Sys.sleep(1L) test_zero(daemons(NULL)) - test_equal(1L, daemons(url = "ws://:0", correctype = 0L, token = TRUE)) + test_zero(daemons(url = "ws://:0", correctype = 0L, token = TRUE)) test_zero(daemons(0L)) test_zero(with(daemons(url = "tcp://:0", correcttype = c(1, 0), token = TRUE), {8L - 9L + 1L})) - test_equal(daemons(n = 2, "ws://:0"), 2L) + test_zero(daemons(n = 2, "ws://:0")) test_type("integer", nextget("pid")) - test_equal(length(nextget("urls")), 2L) + test_equal(length(nextget("urls")), 1L) Sys.sleep(1L) - status <- status()[["daemons"]] - test_class("matrix", status) - test_type("character", dn1 <- dimnames(status)[[1L]]) - test_type("character", parse1 <- nanonext::parse_url(dn1[1L])) - test_type("character", parse2 <- nanonext::parse_url(dn1[2L])) - test_true((port <- as.integer(parse1[["port"]])) > 0L) - test_equal(as.integer(parse2[["port"]]), port) - test_equal(parse1[["path"]], "/1") - test_equal(parse2[["path"]], "/2") - test_zero(sum(status[, "online"])) - test_zero(sum(status[, "instance"])) - test_zero(sum(status[, "assigned"])) - test_zero(sum(status[, "complete"])) - test_type("character", saisei(i = 1L)) - test_null(saisei(i = 0L)) - test_type("character", saisei(i = 1L, force = TRUE)) - test_null(saisei(i = 10L)) + status <- daemons() + test_type("list", status) + test_zero(status[["connections"]]) + test_type("integer", status[["mirai"]]) + test_zero(status[["mirai"]][["awaiting"]]) + test_zero(status[["mirai"]][["executing"]]) + test_zero(status[["mirai"]][["completed"]]) test_zero(daemons(0)) - test_equal(daemons(url = c("tcp://127.0.0.1:45555", "tcp://127.0.0.1:0"), correcttype = NA), 2L) + test_zero(daemons(url = c("tcp://127.0.0.1:0", "tcp://127.0.0.1:45555"), correcttype = NA)) Sys.sleep(1L) - test_null(launch_local(nextget("urls", .compute = "default")[1L], maxtasks = 1L)) - test_null(launch_local(2, maxtasks = 1L)) + test_null(launch_local(nextget("urls", .compute = "default"))) + test_null(launch_local(1)) Sys.sleep(2L) - tstatus <- status()[["daemons"]] - test_class("matrix", tstatus) - test_type("character", tdn1 <- dimnames(tstatus)[[1L]]) - test_type("character", tparse1 <- nanonext::parse_url(tdn1[1L])) - test_type("character", tparse2 <- nanonext::parse_url(tdn1[2L])) - test_true(tparse1[["port"]] != tparse2[["port"]]) - test_equal(sum(tstatus[, "online"]), 2L) - test_equal(sum(tstatus[, "instance"]), 2L) - test_zero(sum(tstatus[, "assigned"])) - test_zero(sum(tstatus[, "complete"])) + test_equal(daemons()[["connections"]], 2L) test_type("list", res <- mirai_map(c(1,1), rnorm)[.progress]) test_true(res[[1L]] != res[[2L]]) - test_zero(daemons(0)) - test_equal(1L, daemons(url = "wss://127.0.0.1:0", token = TRUE, pass = "test")) + test_zero(daemons(url = "wss://127.0.0.1:0", pass = "test")) test_null(launch_local(1L)) Sys.sleep(1L) test_true(grepl("CERTIFICATE", launch_remote(1L), fixed = TRUE)) @@ -275,38 +254,19 @@ connection && .Platform[["OS.type"]] != "windows" && Sys.getenv("NOT_CRAN") == " test_null(saisei(1)) test_error(launch_local(0:1), "out of bounds") test_error(launch_remote(1:2), "out of bounds") - option <- 15L - Sys.setenv(R_DEFAULT_PACKAGES = "stats,utils") - test_equal(1L, daemons(1, dispatcher = TRUE, maxtasks = 10L, timerstart = 1L, walltime = 1000L, seed = 1546, token = TRUE, cleanup = option, autoexit = tools::SIGCONT)) - Sys.unsetenv("R_DEFAULT_PACKAGES") - Sys.sleep(1L) - mq <- mirai(runif(1L), .timeout = 1000) - test_true(is.numeric(mq[])) - mq <- mirai(Sys.sleep(1.5), .timeout = 500) - test_class("matrix", status()[["daemons"]]) - Sys.sleep(2L) test_zero(daemons(0)) - Sys.sleep(1L) test_tls <- function(cert) { file <- tempfile() on.exit(unlink(file)) cat(cert[["server"]], file = file) - daemons(url = "tls+tcp://127.0.0.1:0", tls = file) == 1L && daemons(0L) == 0L + daemons(url = "tls+tcp://127.0.0.1:0", tls = file) == 0L && daemons(0L) == 0L } test_true(test_tls(nanonext::write_cert(cn = "127.0.0.1"))) } -# threaded dispatcher tests +# mirai cancellation tests connection && Sys.getenv("NOT_CRAN") == "true" && { - test_equal(daemons(1, dispatcher = "thread"), 1L) - test_equal(nextget("n"), 1L) - test_true(startsWith(nextget("urls")[[1L]], mirai:::.urlscheme)) - test_class("matrix", status()$daemons) - test_true(mirai(TRUE)[]) - test_zero(daemons(0)) -} -# next dispatcher tests -connection && Sys.getenv("NOT_CRAN") == "true" && { - test_equal(daemons(1, dispatcher = "next", cleanup = FALSE), 1L) + Sys.sleep(1L) + test_equal(daemons(1, dispatcher = "default", cleanup = FALSE), 1L) m1 <- mirai({ Sys.sleep(1); res <<- "m1 done" }) m2 <- mirai({ Sys.sleep(1); res <<- "m2 done" }) Sys.sleep(0.1) @@ -321,7 +281,7 @@ connection && Sys.getenv("NOT_CRAN") == "true" && { test_zero(daemons(0)) Sys.sleep(1L) Sys.setenv(R_DEFAULT_PACKAGES = "stats,utils") - test_equal(1L, daemons(url = "tls+tcp://127.0.0.1:0", dispatcher = "next")) + test_zero(daemons(url = "tls+tcp://127.0.0.1:0", dispatcher = "default")) Sys.unsetenv("R_DEFAULT_PACKAGES") test_null(launch_local(1L)) Sys.sleep(1L) @@ -332,4 +292,26 @@ connection && Sys.getenv("NOT_CRAN") == "true" && { test_class("errorValue", mirai(q(), .timeout = 1000)[]) test_zero(daemons(0)) } +# legacy interface tests +connection && Sys.getenv("NOT_CRAN") == "true" && { + Sys.sleep(1L) + option <- 15L + Sys.setenv(R_DEFAULT_PACKAGES = "stats,utils") + test_equal(1L, daemons(1, dispatcher = TRUE, maxtasks = 10L, timerstart = 1L, walltime = 1000L, seed = 1546, token = TRUE, cleanup = option, autoexit = tools::SIGCONT)) + Sys.unsetenv("R_DEFAULT_PACKAGES") + Sys.sleep(1L) + mq <- mirai(runif(1L), .timeout = 1000) + test_true(is.numeric(mq[])) + mq <- mirai(Sys.sleep(1.5), .timeout = 500) + test_class("matrix", status()[["daemons"]]) + Sys.sleep(2L) + test_zero(daemons(0)) + Sys.sleep(1L) + test_equal(daemons(1, dispatcher = "thread"), 1L) + test_equal(nextget("n"), 1L) + test_true(startsWith(nextget("urls")[[1L]], mirai:::.urlscheme)) + test_class("matrix", status()$daemons) + test_true(mirai(TRUE)[]) + test_zero(daemons(0)) +} Sys.sleep(1L)