Skip to content

Commit

Permalink
removes stop_daemon()
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Dec 30, 2024
1 parent 7b589c5 commit 824f738
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 96 deletions.
1 change: 0 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ export(serial_config)
export(ssh_config)
export(status)
export(stop_cluster)
export(stop_daemon)
export(stop_mirai)
export(unresolved)
importFrom(nanonext,"opt<-")
Expand Down
1 change: 0 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

* `daemons(dispatcher = TRUE)` provides a new and more efficient architecture for dispatcher. This argument reverts to a logical value, although 'process' is still accepted and retains the previous behaviour of the v1 dispatcher.
* Upgrades `stop_mirai()` to cancel remote mirai tasks when using the new dispatcher, returning a logical value indicating whether cancellation was successful.
* Adds `stop_daemon()` to reduce the number of connected daemons (without interrupting any executing tasks).
* `daemon()` gains the new argument 'dispatcher', which should be set to `TRUE` when connecting to dispatcher and `FALSE` when connecting directly to host.

#### Updates
Expand Down
38 changes: 0 additions & 38 deletions R/daemon.R
Original file line number Diff line number Diff line change
Expand Up @@ -318,44 +318,6 @@ v1_daemon <- function(url, asyncdial = FALSE, autoexit = TRUE, cleanup = TRUE,

}

#' Stop Daemon
#'
#' Sends exit signals to reduce the number of connected daemons. Does not
#' interrupt any \sQuote{mirai} tasks in execution - exits will happen once
#' these complete (if applicable). This function is only effective when using
#' dispatcher.
#'
#' @param n [default 1L] integer number of daemons to stop.
#' @inheritParams mirai
#'
#' @return Integer number of exit signals sent (the smaller of \sQuote{n} and
#' the number of actually connected daemons).
#'
#' @examples
#' if (interactive()) {
#' # Only run examples in interactive R sessions
#'
#' daemons(4)
#' status()
#' stop_daemon(2)
#' mirai(TRUE)[]
#' status()
#' daemons(0)
#'
#' }
#'
#' @export
#'
stop_daemon <- function(n = 1L, .compute = "default") {
envir <- if (is.character(.compute)) ..[[.compute]]
length(envir[["msgid"]]) || return(0L)
d <- query_dispatcher(envir[["sock"]], c(0L, 0L))[1L]
stopped <- min(n, d)
for (i in seq_len(stopped))
query_dispatcher(envir[["sock"]], c(0L, -1L))
stopped
}

# internals --------------------------------------------------------------------

handle_mirai_error <- function(e) invokeRestart("mirai_error", e, sys.calls())
Expand Down
24 changes: 9 additions & 15 deletions R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL,
listen(psock, url = url, tls = tls, error = TRUE)

msgid <- 0L
status <- NULL
inq <- outq <- list()
envir <- new.env(hash = FALSE)
if (is.numeric(rs)) `[[<-`(envir, "stream", as.integer(rs))
Expand Down Expand Up @@ -154,38 +153,33 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL,
if (value[1L] == 0L) {
id <- readBin(value, "integer", n = 2L)[2L]
if (id == 0L) {
status <- c(
found <- c(
length(outq),
length(inq),
sum(as.logical(unlist(lapply(outq, .subset2, "msgid"), use.names = FALSE)))
)
} else if (id > 0) {
status <- FALSE
} else {
found <- FALSE
for (i in seq_along(outq))
if (outq[[i]][["msgid"]] == id) {
send(psock, .cancelRequest, mode = 1L, pipe = outq[[i]][["pipe"]], block = TRUE)
outq[[i]][["msgid"]] <- -1L
status <- TRUE
found <- TRUE
break
}
if (!status)
if (!found)
for (i in seq_along(inq))
if (inq[[i]][["msgid"]] == id) {
inq[[i]] <- NULL
status <- TRUE
found <- TRUE
break
}
} else {
status <- TRUE
inq <- c(list(list(ctx = ctx, req = ._scm_. , msgid = -1L)), inq)
}
send(ctx, status, mode = 2L, block = TRUE)
}
if (is.null(status)) {
send(ctx, found, mode = 2L, block = TRUE)

} else {
msgid <- msgid + 1L
inq[[length(inq) + 1L]] <- list(ctx = ctx, req = value, msgid = msgid)
} else {
status <- NULL
}
ctx <- .context(sock)
req <- recv_aio(ctx, mode = 8L, cv = cv)
Expand Down
38 changes: 0 additions & 38 deletions man/stop_daemon.Rd

This file was deleted.

4 changes: 1 addition & 3 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,7 @@ connection && .Platform[["OS.type"]] != "windows" && Sys.getenv("NOT_CRAN") == "
test_equal(daemons()[["connections"]], 2L)
test_type("list", res <- mirai_map(c(1,1), rnorm)[.progress])
test_true(res[[1L]] != res[[2L]])
test_equal(stop_daemon(4L), 2L)
Sys.sleep(0.1)
test_zero(daemons()[["connections"]])
test_equal(2L, daemons()[["connections"]])
test_zero(daemons(0L))
Sys.sleep(1L)
test_zero(daemons(url = "tls+tcp://127.0.0.1:0", dispatcher = TRUE))
Expand Down

0 comments on commit 824f738

Please sign in to comment.