Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create callback based on file descriptor monitoring #190

Merged
merged 102 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
56ce9a3
Fake later_fd
jcheng5 Oct 16, 2024
996c86f
adds select timeout
shikokuchuo Oct 17, 2024
9adaddf
event driven concept
shikokuchuo Oct 17, 2024
a32ea3b
fixes for Windows
shikokuchuo Oct 17, 2024
0753a8b
rename function and add examples
shikokuchuo Oct 17, 2024
b5a45ab
handle inf timeout
shikokuchuo Oct 17, 2024
1bb3e2f
fix timeout calc; align c level argument names
shikokuchuo Oct 18, 2024
ecbf743
update docs and examples
shikokuchuo Oct 18, 2024
23e3e53
update GHA
shikokuchuo Oct 18, 2024
bd2cca4
actually fix timeout calc
shikokuchuo Oct 18, 2024
e43eac3
optimizations + add tests
shikokuchuo Oct 18, 2024
5985ff0
use more idiomatic c++ rather than manual memory management
shikokuchuo Oct 18, 2024
36b1681
avoid std::make_unique for pre C++14 compat
shikokuchuo Oct 18, 2024
e06040b
remap error path to return NAs for invalid fds - allows easier handli…
shikokuchuo Oct 18, 2024
5bf9e11
treat negative timeouts specially as curl uses them to denote a defau…
shikokuchuo Oct 18, 2024
e12d6ae
solve more efficiently
shikokuchuo Oct 18, 2024
f4bfed1
update headers and add comments
shikokuchuo Oct 19, 2024
5684668
Correction for curl default timeout
shikokuchuo Oct 19, 2024
78ad736
global scope for callbackRegistryTable
shikokuchuo Oct 20, 2024
24cf1c6
do not use external api
shikokuchuo Oct 20, 2024
098dd2e
Add more comments
shikokuchuo Oct 20, 2024
d03060a
pre-create evaluation call for efficiency
shikokuchuo Oct 23, 2024
acf387c
do more on background thread
shikokuchuo Oct 23, 2024
ce0434b
register all events
shikokuchuo Oct 23, 2024
a967a81
upgrades select() to poll() / WSAPoll()
shikokuchuo Oct 23, 2024
6e0ae41
fix on windows
shikokuchuo Oct 23, 2024
daaaa43
expand API to take readfds, writefds, exceptfds
shikokuchuo Oct 23, 2024
25c21d8
update poll() error handling + pollfd creation optimization
shikokuchuo Oct 24, 2024
8fcd956
create pthread_attr once and cache
shikokuchuo Oct 24, 2024
3043ce4
implements cancellation
shikokuchuo Oct 24, 2024
61d3558
add cancellation test and cache call for higher efficiency
shikokuchuo Oct 24, 2024
08975e2
adds select() fallback for older R versions on Windows
shikokuchuo Oct 24, 2024
c8479c2
revert workflow file
shikokuchuo Oct 24, 2024
113e15e
update comments
shikokuchuo Oct 24, 2024
e4486fc
revert caching pthread_attr_t
shikokuchuo Oct 24, 2024
3b982bd
clearer poll() / select() results branches
shikokuchuo Oct 24, 2024
b448e25
tidy ups
shikokuchuo Oct 24, 2024
a0fa4a1
simplify by detaching from thread itself
shikokuchuo Oct 25, 2024
bf48672
use atomic flag for cancellation
shikokuchuo Oct 25, 2024
cd8c5e0
detach thread immediately after creation
shikokuchuo Oct 25, 2024
1ac44cc
modify poll return values
shikokuchuo Oct 25, 2024
d77ef9d
simplify poll logic
shikokuchuo Oct 25, 2024
1622227
update DESCRITION, NEWS, README and later_fd() docs
shikokuchuo Oct 25, 2024
3a46f35
simplify error path
shikokuchuo Oct 25, 2024
903b305
have cancellation function return invisibly for consistency with later()
shikokuchuo Oct 25, 2024
ae36f61
higher safety - do not access SEXPs at all from background thread
shikokuchuo Oct 26, 2024
d2e88e1
have wait thread periodically check for cancellation
shikokuchuo Oct 26, 2024
246317d
correct select() loop and optimize
shikokuchuo Oct 26, 2024
3501915
add more tests
shikokuchuo Oct 27, 2024
852e1d0
use rcpp function calling
shikokuchuo Oct 27, 2024
3626011
code style and clarity
shikokuchuo Oct 27, 2024
45b665f
create canceller in R
shikokuchuo Oct 27, 2024
989e33e
restrict scope of callbackRegistryTable
shikokuchuo Oct 27, 2024
d8eb8b7
add note on poll failures
shikokuchuo Oct 27, 2024
9ceaf14
split wait_thread() into two versions
shikokuchuo Oct 27, 2024
fb07bff
fix for windows
shikokuchuo Oct 27, 2024
a32b3ea
use separate results vector
shikokuchuo Oct 27, 2024
d8eb35f
use Timestamp class
shikokuchuo Oct 27, 2024
ec08515
update one test
shikokuchuo Oct 28, 2024
94f1b40
update one more test
shikokuchuo Oct 28, 2024
c46399c
allow older R versions to find WSAPoll() on Windows
shikokuchuo Oct 28, 2024
1529c5e
fix integer overflow for timer
shikokuchuo Oct 28, 2024
579134e
native later_fd concept
shikokuchuo Oct 28, 2024
4767c6f
register c callable
shikokuchuo Oct 28, 2024
cf6427a
add pollfd constructor
shikokuchuo Oct 28, 2024
c651489
completes refactor
shikokuchuo Oct 28, 2024
e7975cf
use header file to avoid repetition
shikokuchuo Oct 28, 2024
95adc95
use implicit SEXP conversion for xptr
shikokuchuo Oct 28, 2024
1bafc3b
efficiencies (thanks @jcheng5)
shikokuchuo Oct 28, 2024
1227d78
adds native later_fd() to later.h/later_api.h
shikokuchuo Oct 28, 2024
684806b
header order
shikokuchuo Oct 28, 2024
f89b745
typo in readme - array
shikokuchuo Oct 28, 2024
c76a4d3
have execLastFDNative() return void
shikokuchuo Oct 28, 2024
c601433
call detach from thread itself
shikokuchuo Oct 28, 2024
25d1ea7
move detach up
shikokuchuo Oct 28, 2024
be661aa
directly return SEXP from execLater_fd_impl()
shikokuchuo Oct 28, 2024
306c0b6
improve readme wording
shikokuchuo Oct 28, 2024
c8dadf5
Naming consistency: execLaterFDNative -> execLaterFdNative
shikokuchuo Oct 28, 2024
4945e79
Makes execLater_launch_thread() return int to execLater_fd_impl() - t…
shikokuchuo Oct 29, 2024
a7b224c
actually return bool and simplify
shikokuchuo Oct 29, 2024
1e22a0b
limit scope of internal functions
shikokuchuo Oct 29, 2024
9d4dce2
Merge pull request #1 from shikokuchuo/fd_native
shikokuchuo Oct 29, 2024
27e8bbe
Have execLaterFdNative() return int so C functions directly linking t…
shikokuchuo Oct 29, 2024
d812039
cap timeout at 3e10 seconds
shikokuchuo Oct 30, 2024
df35f4e
more efficient construction of Rcpp logical vector avoiding wrap()
shikokuchuo Oct 31, 2024
4b0f60e
simplify and push performance
shikokuchuo Oct 31, 2024
cc42e7f
encapsulate all creation logic in ThreadArgs class constructor
shikokuchuo Oct 31, 2024
ad06db4
further Rcpp-related optimizations
shikokuchuo Oct 31, 2024
7718eb6
more efficient interval polling logic
shikokuchuo Oct 31, 2024
95d83c0
use README example that actually runs
shikokuchuo Nov 2, 2024
0fa8ed3
allow callback to be run immediately when no file descriptors supplied
shikokuchuo Nov 4, 2024
a327a70
Apply suggestions from code review by @jcheng5
shikokuchuo Nov 6, 2024
1e9107c
num_fds no longer needed in ThreadArgs
shikokuchuo Nov 6, 2024
e0b21a9
rename native function
shikokuchuo Nov 6, 2024
f9dbb70
flag -> active; use atomic compare_exchange
shikokuchuo Nov 6, 2024
9b43fc9
move extern callbackRegistryTable declaration to header for maintaina…
shikokuchuo Nov 6, 2024
82cb321
make loop_empty() reflect later_fd callbacks - RAII approach
shikokuchuo Nov 7, 2024
b41203d
add some final cleanups and tests
shikokuchuo Nov 7, 2024
0d4699a
fold RefCounter into ThreadArgs itself; use more efficient doExecLate…
shikokuchuo Nov 7, 2024
2935304
revert use of doExecLater() - needs to be under mutex protection
shikokuchuo Nov 7, 2024
fd0d013
Add test skip for CRAN nosuggests
shikokuchuo Nov 8, 2024
37b6e86
move ThreadArgs registry to private
shikokuchuo Nov 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ Version: 1.3.2.9000
Authors@R: c(
person("Winston", "Chang", role = c("aut", "cre"), email = "[email protected]"),
person("Joe", "Cheng", role = c("aut"), email = "[email protected]"),
person("Charlie", "Gao", role = c("aut"), email = "[email protected]", comment = c(ORCID = "0000-0002-0750-061X")),
person(family = "Posit Software, PBC", role = "cph"),
person("Marcus", "Geelnard", role = c("ctb", "cph"), comment = "TinyCThread library, https://tinycthread.github.io/"),
person("Evan", "Nemerson", role = c("ctb", "cph"), comment = "TinyCThread library, https://tinycthread.github.io/")
Expand All @@ -20,9 +21,10 @@ Imports:
rlang
LinkingTo: Rcpp
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.3.1
RoxygenNote: 7.3.2
Suggests:
knitr,
nanonext,
rmarkdown,
testthat (>= 2.1.0)
VignetteBuilder: knitr
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export(destroy_loop)
export(exists_loop)
export(global_loop)
export(later)
export(later_fd)
export(loop_empty)
export(next_op_secs)
export(run_now)
Expand Down
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# later (development version)

* Adds `later_fd()` which executes a function when a file descriptor is ready for reading or writing, at some indeterminate time in the future (subject to an optional timeout). This facilitates an event-driven approach to asynchronous or streaming downloads. (@shikokuchuo and @jcheng5, #190)

* Fixed #186: Improvements to package load time as `rlang` is now only loaded when used. This is a notable efficiency for packages with only a 'linking to' dependency on `later`. Also updates to native symbol registration from dynamic lookup. (@shikokuchuo and @wch, #187)

# later 1.3.2
Expand Down
8 changes: 8 additions & 0 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ using_ubsan <- function() {
.Call(`_later_using_ubsan`)
}

execLater_fd <- function(callback, readfds, writefds, exceptfds, timeoutSecs, loop_id) {
.Call(`_later_execLater_fd`, callback, readfds, writefds, exceptfds, timeoutSecs, loop_id)
}

fd_cancel <- function(xptr) {
.Call(`_later_fd_cancel`, xptr)
}

setCurrentRegistryId <- function(id) {
invisible(.Call(`_later_setCurrentRegistryId`, id))
}
Expand Down
92 changes: 92 additions & 0 deletions R/later.R
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,98 @@ create_canceller <- function(id, loop_id) {
}
}

#' Executes a function when a file descriptor is ready
#'
#' Schedule an R function or formula to run after an indeterminate amount of
#' time when file descriptors are ready for reading or writing, subject to an
#' optional timeout.
jcheng5 marked this conversation as resolved.
Show resolved Hide resolved
#'
#' On the occasion the system-level `poll` (on Windows `WSAPoll`) returns an
#' error, the callback will be made on a vector of all `NA`s. This is
#' indistinguishable from a case where the `poll` succeeds but there are error
#' conditions pending against each file descriptor.
#'
#' If no file descriptors are supplied, the callback is scheduled for immediate
#' execution and made on the empty logical vector `logical(0)`.
#'
#' @param func A function that takes a single argument, a logical vector that
#' indicates which file descriptors are ready (a concatenation of `readfds`,
#' `writefds` and `exceptfds`). This may be all `FALSE` if the
#' `timeout` argument is non-`Inf`. File descriptors with error conditions
#' pending are represented as `NA`, as are invalid file descriptors such as
#' those already closed.
#' @param readfds Integer vector of file descriptors, or Windows SOCKETs, to
#' monitor for being ready to read.
#' @param writefds Integer vector of file descriptors, or Windows SOCKETs, to
#' monitor being ready to write.
#' @param exceptfds Integer vector of file descriptors, or Windows SOCKETs, to
#' monitor for error conditions pending.
#' @param timeout Number of seconds to wait before giving up, and calling `func`
#' with all `FALSE`. The default `Inf` implies waiting indefinitely.
#' Specifying `0` will check once without blocking, and supplying a negative
#' value defaults to a timeout of 1s.
#' @param loop A handle to an event loop. Defaults to the currently-active loop.
#'
#' @inherit later return note
#'
#' @examplesIf requireNamespace("nanonext", quietly = TRUE)
#' # create nanonext sockets
#' s1 <- nanonext::socket(listen = "inproc://nano")
#' s2 <- nanonext::socket(dial = "inproc://nano")
#' fd1 <- nanonext::opt(s1, "recv-fd")
#' fd2 <- nanonext::opt(s2, "recv-fd")
#'
#' # 1. timeout: prints FALSE, FALSE
#' later_fd(print, c(fd1, fd2), timeout = 0.1)
#' Sys.sleep(0.2)
#' run_now()
#'
#' # 2. fd1 ready: prints TRUE, FALSE
#' later_fd(print, c(fd1, fd2), timeout = 1)
#' res <- nanonext::send(s2, "msg")
#' Sys.sleep(0.1)
#' run_now()
#'
#' # 3. both ready: prints TRUE, TRUE
#' res <- nanonext::send(s1, "msg")
#' later_fd(print, c(fd1, fd2), timeout = 1)
#' Sys.sleep(0.1)
#' run_now()
#'
#' # 4. fd2 ready: prints FALSE, TRUE
#' res <- nanonext::recv(s1)
#' later_fd(print, c(fd1, fd2), timeout = 1)
#' Sys.sleep(0.1)
#' run_now()
#'
#' # 5. fds invalid: prints NA, NA
#' close(s2)
#' close(s1)
#' later_fd(print, c(fd1, fd2), timeout = 0)
#' Sys.sleep(0.1)
#' run_now()
#'
#' @export
later_fd <- function(func, readfds = integer(), writefds = integer(), exceptfds = integer(),
timeout = Inf, loop = current_loop()) {
if (!is.function(func)) {
func <- rlang::as_function(func)
}
xptr <- execLater_fd(func, readfds, writefds, exceptfds, timeout, loop$id)

invisible(create_fd_canceller(xptr))
}

# Returns a function that will cancel a callback with the given external
# pointer. If the callback has already been executed or canceled, then the
# function has no effect.
create_fd_canceller <- function(xptr) {
force(xptr)
function() {
invisible(fd_cancel(xptr))
}
}

#' Execute scheduled operations
#'
#' Normally, operations scheduled with [later()] will not execute unless/until
Expand Down
20 changes: 19 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ Or a formula (in this case, run as soon as control returns to the top-level):
```r
later::later(~print("Got here!"))
```
### File Descriptor Readiness

It is also possible to have a function run based on when file descriptors are ready for reading or writing, at some indeterminate time in the future.

Below, a logical vector is printed indicating which of file descriptors 21 or 22 were ready, subject to a timeout of 1s. Instead of just printing the result, the supplied function can also do something more useful such as reading from the descriptor.

```r
later::later_fd(print, c(21L, 22L), timeout = 1)
```

This is useful in particular for asynchronous or streaming data transfer over the network / internet, so that reads can be made from TCP sockets as soon as data is available. `later::later_fd()` pairs well with functions such as `curl::multi_fdset()` that return the relevant file descriptors to be monitored .

## Usage from C++

Expand All @@ -45,6 +56,13 @@ void later(void (*func)(void*), void* data, double secs)

The first argument is a pointer to a function that takes one `void*` argument and returns void. The second argument is a `void*` that will be passed to the function when it's called back. And the third argument is the number of seconds to wait (at a minimum) before invoking.

`later::later_fd` is also accessible from `later_api.h` and its prototype looks like this:

```cpp
void later_fd(void (*func)(int *, void *), void *data, int num_fds, struct pollfd *fds, double secs)
```
The first argument is a pointer to a function that takes two arguments: the first being an `int*` array provided by `later_fd()` when called back, and the second being a `void*`. The `int*` array will be the length of `num_fds` and contain the values `0`, `1` or `NA_INTEGER` to indicate the readiness of each file descriptor, or an error condition respectively. The second argument `data` is passed to the `void*` argument of the function when it's called back. The third is the total number of file descriptors being passed, the fourth a pointer to an array of `stuct pollfds`, and the fifth the number of seconds to wait until timing out.

To use the C++ interface, you'll need to add `later` to your `DESCRIPTION` file under both `LinkingTo` and `Imports`, and also make sure that your `NAMESPACE` file has an `import(later)` entry.

### Background tasks
Expand Down Expand Up @@ -121,4 +139,4 @@ void asyncMean(Rcpp::NumericVector data) {
}
```

It's not very useful to execute tasks on background threads if you can't get access to the results back in R. We'll soon be introducing a complementary R package that provides a suitable "promise" or "future" abstraction.
It's not very useful to execute tasks on background threads if you can't get access to the results back in R. The [promises](https://github.com/rstudio/promises) package complements later by providing a "promise" abstraction.
36 changes: 36 additions & 0 deletions inst/include/later.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
#endif

#ifdef _WIN32
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0600 // so R <= 4.1 can find WSAPoll() on Windows
#endif
#include <winsock2.h>
#define WIN32_LEAN_AND_MEAN
// Taken from http://tolstoy.newcastle.edu.au/R/e2/devel/06/11/1242.html
// Undefine the Realloc macro, which is defined by both R and by Windows stuff
Expand All @@ -21,6 +25,7 @@
#undef Free
#include <windows.h>
#else // _WIN32
#include <poll.h>
#include <pthread.h>
#endif // _WIN32

Expand Down Expand Up @@ -91,6 +96,37 @@ inline void later(void (*func)(void*), void* data, double secs) {
later(func, data, secs, GLOBAL_LOOP);
}

inline void later_fd(void (*func)(int *, void *), void *data, int num_fds, struct pollfd *fds, double secs, int loop_id) {
// See above note for later()

// The function type for the real execLaterFdNative
typedef void (*elfdnfun)(void (*)(int *, void *), void *, int, struct pollfd *, double, int);
static elfdnfun elfdn = NULL;
if (!elfdn) {
// Initialize if necessary
if (func) {
// We're not initialized but someone's trying to actually schedule
// some code to be executed!
REprintf(
"Warning: later::execLaterFdNative called in uninitialized state. "
"If you're using <later.h>, please switch to <later_api.h>.\n"
);
}
elfdn = (elfdnfun) R_GetCCallable("later", "execLaterFdNative");
}

// We didn't want to execute anything, just initialize
if (!func) {
return;
}

elfdn(func, data, num_fds, fds, secs, loop_id);
}

inline void later_fd(void (*func)(int *, void *), void *data, int num_fds, struct pollfd *fds, double secs) {
later_fd(func, data, num_fds, fds, secs, GLOBAL_LOOP);
}


class BackgroundTask {

Expand Down
3 changes: 2 additions & 1 deletion inst/include/later_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ namespace {
// See comment in execLaterNative to learn why we need to do this
// in a statically initialized object
later::later(NULL, NULL, 0);
later::later_fd(NULL, NULL, 0, NULL, 0);
}
};

static LaterInitializer init;

} // namespace
Expand Down
110 changes: 110 additions & 0 deletions man/later_fd.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/Makevars.win
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
PKG_CPPFLAGS = -DSTRICT_R_HEADERS
PKG_LIBS = -lWs2_32

#### Debugging flags ####
# Uncomment to enable thread assertions
Expand Down
Loading
Loading