-
-
Notifications
You must be signed in to change notification settings - Fork 388
/
Copy pathcreate_forked_task.R
77 lines (72 loc) · 2.4 KB
/
create_forked_task.R
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
library(shiny)
# Also uses parallel, shinyjs, tools
# Create a long-running task, executed in a forked process. (Doesn't work on Windows)
#
# The return value is a promise-like object with three
# methods:
# - completed(): FALSE initially, then TRUE if the task succeeds,
# fails, or is cancelled. Reactive, so when the state changes
# any reactive readers will invalidate.
# - result(): Use this to get the return value. While execution is
# in progress, performs a req(FALSE). If task succeeded, returns
# the return value. If failed, throws error. Reactive, so when
# the state changes any reactive readers will invalidate.
# - cancel(): Call this to prematurely terminate the task.
create_forked_task <- function(expr) {
makeReactiveBinding("state")
state <- factor("running",
levels = c("running", "success", "error", "cancel"),
ordered = TRUE
)
result <- NULL
# Launch the task in a forked process. This always returns
# immediately, and we get back a handle we can use to monitor
# or kill the job.
task_handle <- parallel::mcparallel({
force(expr)
})
# Poll every 100 milliseconds until the job completes
o <- observe({
res <- parallel::mccollect(task_handle, wait = FALSE)
if (is.null(res)) {
invalidateLater(100)
} else {
o$destroy()
if (!is.list(res) || length(res) != 1 || !inherits(res[[1]], "try-error")) {
state <<- "success"
result <<- res[[1]]
} else {
state <<- "error"
result <<- attr(res[[1]], "condition", exact = TRUE)
}
}
})
list(
completed = function() {
state != "running"
},
result = function() {
if (state == "running") {
# If running, abort the current context silently.
# We've taken a reactive dependency on "state" so if
# the state changes the context will invalidate.
req(FALSE)
} else if (state == "success") {
return(result)
} else if (state == "error") {
stop(result)
} else if (state == "cancel") {
validate(need(FALSE, "The operation was cancelled"))
}
},
cancel = function() {
if (state == "running") {
state <<- "cancel"
o$destroy()
tools::pskill(task_handle$pid, tools::SIGTERM)
tools::pskill(-task_handle$pid, tools::SIGTERM)
parallel::mccollect(task_handle, wait = FALSE)
}
}
)
}