Skip to content

Commit

Permalink
improves vectorized mirai cancellation
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Dec 29, 2024
1 parent d09c988 commit 517b70f
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 7 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ importFrom(nanonext,call_aio_)
importFrom(nanonext,collect_aio)
importFrom(nanonext,collect_aio_)
importFrom(nanonext,cv)
importFrom(nanonext,cv_signal)
importFrom(nanonext,cv_value)
importFrom(nanonext,dial)
importFrom(nanonext,is_error_value)
Expand Down
5 changes: 5 additions & 0 deletions R/dispatcher.R
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL,
value <- .subset2(res, "value")
id <- as.character(.subset2(res, "aio"))
res <- recv_aio(psock, mode = 8L, cv = cv)
if (outq[[id]][["msgid"]] < 0) {
outq[[id]][["msgid"]] <- 0L
cv_signal(cv)
next
}
send(outq[[id]][["ctx"]], value, mode = 2L, block = TRUE)
outq[[id]][["msgid"]] <- 0L
}
Expand Down
8 changes: 4 additions & 4 deletions R/mirai-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
#' (\href{https://orcid.org/0000-0002-0750-061X}{ORCID})
#'
#' @importFrom nanonext .advance call_aio call_aio_ collect_aio collect_aio_
#' .context cv cv_value dial .interrupt is_error_value .keep listen lock .mark
#' mclock monitor msleep nng_error opt opt<- parse_url pipe_notify random
#' read_monitor reap recv recv_aio request send serial_config socket stat
#' stop_aio tls_config unresolved .unresolved until wait write_cert
#' .context cv cv_signal cv_value dial .interrupt is_error_value .keep listen
#' lock .mark mclock monitor msleep nng_error opt opt<- parse_url pipe_notify
#' random read_monitor reap recv recv_aio request send serial_config socket
#' stat stop_aio tls_config unresolved .unresolved until wait write_cert
#'
"_PACKAGE"

Expand Down
7 changes: 6 additions & 1 deletion R/mirai.R
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,12 @@ collect_mirai <- collect_aio
#' @export
#'
stop_mirai <- function(x) {
is.list(x) && return(rev(as.logical(lapply(rev(unclass(x)), stop_mirai))))
is.list(x) && {
vec <- logical(length(x))
for (i in length(x):1)
vec[i] <- stop_mirai(x[[i]])
return(vec)
}
.compute <- attr(x, "profile")
envir <- if (is.character(.compute)) ..[[.compute]]
stop_aio(x)
Expand Down
1 change: 1 addition & 0 deletions mirai.Rproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
Version: 1.0
ProjectId: 6c04a62e-266d-423f-8aef-506f99a7fe8d

RestoreWorkspace: Default
SaveWorkspace: Default
Expand Down
5 changes: 3 additions & 2 deletions tests/tests.R
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,11 @@ connection && Sys.getenv("NOT_CRAN") == "true" && {
test_equal(m1$data, 20L)
test_class("errorValue", mirai(res)[])
m <- mirai_map(1:10, function(x) { Sys.sleep(2); y <<- TRUE })
Sys.sleep(0.1)
s <- stop_mirai(m)
test_equal(sum(unlist(m[])), 200L)
if (Sys.info()[["sysname"]] == "Linux") test_identical(s, !logical(10L))
if (Sys.info()[["sysname"]] == "Linux") test_class("errorValue", mirai(y)[])
test_class("errorValue", mirai(y)[])
test_identical(s, !logical(10L))
test_equal(status()$connections, 1L)
test_equal(length(nextget("urls")), 1L)
test_class("miraiLaunchCmd", launch_remote(1))
Expand Down

0 comments on commit 517b70f

Please sign in to comment.