From 517b70f0cfa8cb70917998b9db5b8b35a94ba3c8 Mon Sep 17 00:00:00 2001 From: shikokuchuo <53399081+shikokuchuo@users.noreply.github.com> Date: Sun, 29 Dec 2024 16:14:35 +0000 Subject: [PATCH] improves vectorized mirai cancellation --- NAMESPACE | 1 + R/dispatcher.R | 5 +++++ R/mirai-package.R | 8 ++++---- R/mirai.R | 7 ++++++- mirai.Rproj | 1 + tests/tests.R | 5 +++-- 6 files changed, 20 insertions(+), 7 deletions(-) diff --git a/NAMESPACE b/NAMESPACE index 8df0a8ca1..75bfab94d 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -67,6 +67,7 @@ importFrom(nanonext,call_aio_) importFrom(nanonext,collect_aio) importFrom(nanonext,collect_aio_) importFrom(nanonext,cv) +importFrom(nanonext,cv_signal) importFrom(nanonext,cv_value) importFrom(nanonext,dial) importFrom(nanonext,is_error_value) diff --git a/R/dispatcher.R b/R/dispatcher.R index 61317d6ff..44beb41e6 100644 --- a/R/dispatcher.R +++ b/R/dispatcher.R @@ -201,6 +201,11 @@ dispatcher <- function(host, url = NULL, n = NULL, ..., tls = NULL, pass = NULL, value <- .subset2(res, "value") id <- as.character(.subset2(res, "aio")) res <- recv_aio(psock, mode = 8L, cv = cv) + if (outq[[id]][["msgid"]] < 0) { + outq[[id]][["msgid"]] <- 0L + cv_signal(cv) + next + } send(outq[[id]][["ctx"]], value, mode = 2L, block = TRUE) outq[[id]][["msgid"]] <- 0L } diff --git a/R/mirai-package.R b/R/mirai-package.R index b7f6b0b64..58cfd66cd 100644 --- a/R/mirai-package.R +++ b/R/mirai-package.R @@ -46,10 +46,10 @@ #' (\href{https://orcid.org/0000-0002-0750-061X}{ORCID}) #' #' @importFrom nanonext .advance call_aio call_aio_ collect_aio collect_aio_ -#' .context cv cv_value dial .interrupt is_error_value .keep listen lock .mark -#' mclock monitor msleep nng_error opt opt<- parse_url pipe_notify random -#' read_monitor reap recv recv_aio request send serial_config socket stat -#' stop_aio tls_config unresolved .unresolved until wait write_cert +#' .context cv cv_signal cv_value dial .interrupt is_error_value .keep listen +#' lock .mark mclock monitor msleep nng_error opt opt<- parse_url pipe_notify +#' random read_monitor reap recv recv_aio request send serial_config socket +#' stat stop_aio tls_config unresolved .unresolved until wait write_cert #' "_PACKAGE" diff --git a/R/mirai.R b/R/mirai.R index 29c91243c..93e5a79c5 100644 --- a/R/mirai.R +++ b/R/mirai.R @@ -424,7 +424,12 @@ collect_mirai <- collect_aio #' @export #' stop_mirai <- function(x) { - is.list(x) && return(rev(as.logical(lapply(rev(unclass(x)), stop_mirai)))) + is.list(x) && { + vec <- logical(length(x)) + for (i in length(x):1) + vec[i] <- stop_mirai(x[[i]]) + return(vec) + } .compute <- attr(x, "profile") envir <- if (is.character(.compute)) ..[[.compute]] stop_aio(x) diff --git a/mirai.Rproj b/mirai.Rproj index 497f8bfcf..7fff8a58c 100644 --- a/mirai.Rproj +++ b/mirai.Rproj @@ -1,4 +1,5 @@ Version: 1.0 +ProjectId: 6c04a62e-266d-423f-8aef-506f99a7fe8d RestoreWorkspace: Default SaveWorkspace: Default diff --git a/tests/tests.R b/tests/tests.R index 2d73275e2..eaa3d7e4f 100644 --- a/tests/tests.R +++ b/tests/tests.R @@ -292,10 +292,11 @@ connection && Sys.getenv("NOT_CRAN") == "true" && { test_equal(m1$data, 20L) test_class("errorValue", mirai(res)[]) m <- mirai_map(1:10, function(x) { Sys.sleep(2); y <<- TRUE }) + Sys.sleep(0.1) s <- stop_mirai(m) test_equal(sum(unlist(m[])), 200L) - if (Sys.info()[["sysname"]] == "Linux") test_identical(s, !logical(10L)) - if (Sys.info()[["sysname"]] == "Linux") test_class("errorValue", mirai(y)[]) + test_class("errorValue", mirai(y)[]) + test_identical(s, !logical(10L)) test_equal(status()$connections, 1L) test_equal(length(nextget("urls")), 1L) test_class("miraiLaunchCmd", launch_remote(1))