Skip to content

Commit

Permalink
refresh docs
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Jan 7, 2025
1 parent 5660bfc commit dfdf8bb
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 351 deletions.
217 changes: 63 additions & 154 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -146,160 +146,6 @@ daemon <- function(url, dispatcher = FALSE, ..., asyncdial = FALSE, autoexit = T

}

#' Daemon Instance (Legacy v1)
#'
#' Starts up an execution daemon to receive \code{\link{mirai}} requests. Awaits
#' data, evaluates an expression in an environment containing the supplied data,
#' and returns the value to the host caller. Daemon settings may be controlled
#' by \code{\link{daemons}} and this function should not need to be invoked
#' directly, unless deploying manually on remote resources.
#'
#' The network topology is such that daemons dial into the host or dispatcher,
#' which listens at the \sQuote{url} address. In this way, network resources may
#' be added or removed dynamically and the host or dispatcher automatically
#' distributes tasks to all available daemons.
#'
#' @param url the character host or dispatcher URL to dial into, including the
#' port to connect to (and optionally for websockets, a path), e.g.
#' 'tcp://hostname:5555' or 'ws://10.75.32.70:5555/path'.
#' @param asyncdial [default FALSE] whether to perform dials asynchronously. The
#' default FALSE will error if a connection is not immediately possible (for
#' instance if \code{\link{daemons}} has yet to be called on the host, or the
#' specified port is not open etc.). Specifying TRUE continues retrying
#' (indefinitely) if not immediately successful, which is more resilient but
#' can mask potential connection issues.
#' @param autoexit [default TRUE] logical value, whether the daemon should exit
#' automatically when its socket connection ends. If a signal from the
#' \pkg{tools} package, such as \code{tools::SIGINT}, or an equivalent integer
#' value is supplied, this signal is additionally raised on exit (see
#' 'Persistence' section below).
#' @param cleanup [default TRUE] logical value, whether to perform cleanup of
#' the global environment and restore attached packages and options to an
#' initial state after each evaluation. For more granular control, also
#' accepts an integer value (see \sQuote{Cleanup Options} section below).
#' @param output [default FALSE] logical value, to output generated stdout /
#' stderr if TRUE, or else discard if FALSE. Specify as TRUE in the
#' \sQuote{...} argument to \code{\link{daemons}} or
#' \code{\link{launch_local}} to provide redirection of output to the host
#' process (applicable only for local daemons).
#' @param maxtasks [default Inf] the maximum number of tasks to execute (task
#' limit) before exiting.
#' @param idletime [default Inf] maximum idle time, since completion of the last
#' task (in milliseconds) before exiting.
#' @param walltime [default Inf] soft walltime, or the minimum amount of real
#' time taken (in milliseconds) before exiting.
#' @param timerstart [default 0L] number of completed tasks after which to start
#' the timer for \sQuote{idletime} and \sQuote{walltime}. 0L implies timers
#' are started upon launch.
#' @param ... reserved but not currently used.
#' @param tls [default NULL] required for secure TLS connections over
#' 'tls+tcp://' or 'wss://'. \strong{Either} the character path to a file
#' containing X.509 certificate(s) in PEM format, comprising the certificate
#' authority certificate chain starting with the TLS certificate and ending
#' with the CA certificate, \strong{or} a length 2 character vector comprising
#' [i] the certificate authority certificate chain and [ii] the empty string
#' \code{''}.
#' @param rs [default NULL] the initial value of .Random.seed. This is set
#' automatically using L'Ecuyer-CMRG RNG streams generated by the host process
#' and should not be independently supplied.
#'
#' @return Invisible NULL.
#'
#' @section Persistence:
#'
#' The \sQuote{autoexit} argument governs persistence settings for the daemon.
#' The default TRUE ensures that it will exit cleanly once its socket connection
#' has ended.
#'
#' Instead of TRUE, supplying a signal from the \pkg{tools} package, such as
#' \code{tools::SIGINT}, or an equivalent integer value, sets this signal to be
#' raised when the socket connection ends. For instance, supplying SIGINT allows
#' a potentially more immediate exit by interrupting any ongoing evaluation
#' rather than letting it complete.
#'
#' Setting to FALSE allows the daemon to persist indefinitely even when there is
#' no longer a socket connection. This allows a host session to end and a new
#' session to connect at the URL where the daemon is dialled in. Daemons must be
#' terminated with \code{daemons(NULL)} in this case, which sends explicit exit
#' instructions to all connected daemons.
#'
#' @section Cleanup Options:
#'
#' The \sQuote{cleanup} argument also accepts an integer value, which operates
#' an additive bitmask: perform cleanup of the global environment (1L), reset
#' attached packages to an initial state (2L), restore options to an initial
#' state (4L), and perform garbage collection (8L).
#'
#' As an example, to perform cleanup of the global environment and garbage
#' collection, specify 9L (1L + 8L). The default argument value of TRUE performs
#' all actions apart from garbage collection and is equivalent to a value of 7L.
#'
#' Caution: do not reset options but not loaded packages if packages set options
#' on load.
#'
#' @noRd
#'
v1_daemon <- function(url, asyncdial = FALSE, autoexit = TRUE, cleanup = TRUE,
output = FALSE, maxtasks = Inf, idletime = Inf, walltime = Inf,
timerstart = 0L, ..., tls = NULL, rs = NULL) {

cv <- cv()
sock <- socket("rep")
on.exit(reap(sock))
`[[<-`(., "sock", sock)
autoexit && pipe_notify(sock, cv = cv, remove = TRUE, flag = as.integer(autoexit))
if (length(tls)) tls <- tls_config(client = tls)
dial_and_sync_socket(sock, url, asyncdial = asyncdial, tls = tls)

if (is.numeric(rs)) `[[<-`(.GlobalEnv, ".Random.seed", as.integer(rs))
if (idletime > walltime) idletime <- walltime else if (idletime == Inf) idletime <- NULL
cleanup <- parse_cleanup(cleanup)
if (!output) {
devnull <- file(nullfile(), open = "w", blocking = FALSE)
sink(file = devnull)
sink(file = devnull, type = "message")
on.exit({
sink(type = "message")
sink()
close(devnull)
}, add = TRUE)
}
snapshot()
count <- 0L
start <- mclock()

repeat {

ctx <- .context(sock)
aio <- recv_aio(ctx, mode = 1L, timeout = idletime, cv = cv)
wait(cv) || break
m <- collect_aio(aio)
is.object(m) && {
count < timerstart && {
start <- mclock()
next
}
break
}
data <- eval_mirai(m)
count <- count + 1L

(count >= maxtasks || count > timerstart && mclock() - start >= walltime) && {
.mark()
send(ctx, data, mode = 1L, block = TRUE)
aio <- recv_aio(ctx, mode = 8L, cv = cv)
wait(cv)
break
}

send(ctx, data, mode = 1L, block = TRUE)
perform_cleanup(cleanup)
if (count <= timerstart) start <- mclock()

}

}

#' dot Daemon
#'
#' Ephemeral executor for the remote process. User code must not call this.
Expand Down Expand Up @@ -373,3 +219,66 @@ do_cleanup <- function() {
}

snapshot <- function() `[[<-`(`[[<-`(`[[<-`(., "op", .Options), "se", search()), "vars", names(.GlobalEnv))

# Legacy compatibility functions ----------------------------------------------

v1_daemon <- function(url, asyncdial = FALSE, autoexit = TRUE, cleanup = TRUE,
output = FALSE, maxtasks = Inf, idletime = Inf, walltime = Inf,
timerstart = 0L, ..., tls = NULL, rs = NULL) {

cv <- cv()
sock <- socket("rep")
on.exit(reap(sock))
`[[<-`(., "sock", sock)
autoexit && pipe_notify(sock, cv = cv, remove = TRUE, flag = as.integer(autoexit))
if (length(tls)) tls <- tls_config(client = tls)
dial_and_sync_socket(sock, url, asyncdial = asyncdial, tls = tls)

if (is.numeric(rs)) `[[<-`(.GlobalEnv, ".Random.seed", as.integer(rs))
if (idletime > walltime) idletime <- walltime else if (idletime == Inf) idletime <- NULL
cleanup <- parse_cleanup(cleanup)
if (!output) {
devnull <- file(nullfile(), open = "w", blocking = FALSE)
sink(file = devnull)
sink(file = devnull, type = "message")
on.exit({
sink(type = "message")
sink()
close(devnull)
}, add = TRUE)
}
snapshot()
count <- 0L
start <- mclock()

repeat {

ctx <- .context(sock)
aio <- recv_aio(ctx, mode = 1L, timeout = idletime, cv = cv)
wait(cv) || break
m <- collect_aio(aio)
is.object(m) && {
count < timerstart && {
start <- mclock()
next
}
break
}
data <- eval_mirai(m)
count <- count + 1L

(count >= maxtasks || count > timerstart && mclock() - start >= walltime) && {
.mark()
send(ctx, data, mode = 1L, block = TRUE)
aio <- recv_aio(ctx, mode = 8L, cv = cv)
wait(cv)
break
}

send(ctx, data, mode = 1L, block = TRUE)
perform_cleanup(cleanup)
if (count <= timerstart) start <- mclock()

}

}
Loading

0 comments on commit dfdf8bb

Please sign in to comment.