Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create callback based on file descriptor monitoring #190

Merged
merged 102 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
56ce9a3
Fake later_fd
jcheng5 Oct 16, 2024
996c86f
adds select timeout
shikokuchuo Oct 17, 2024
9adaddf
event driven concept
shikokuchuo Oct 17, 2024
a32ea3b
fixes for Windows
shikokuchuo Oct 17, 2024
0753a8b
rename function and add examples
shikokuchuo Oct 17, 2024
b5a45ab
handle inf timeout
shikokuchuo Oct 17, 2024
1bb3e2f
fix timeout calc; align c level argument names
shikokuchuo Oct 18, 2024
ecbf743
update docs and examples
shikokuchuo Oct 18, 2024
23e3e53
update GHA
shikokuchuo Oct 18, 2024
bd2cca4
actually fix timeout calc
shikokuchuo Oct 18, 2024
e43eac3
optimizations + add tests
shikokuchuo Oct 18, 2024
5985ff0
use more idiomatic c++ rather than manual memory management
shikokuchuo Oct 18, 2024
36b1681
avoid std::make_unique for pre C++14 compat
shikokuchuo Oct 18, 2024
e06040b
remap error path to return NAs for invalid fds - allows easier handli…
shikokuchuo Oct 18, 2024
5bf9e11
treat negative timeouts specially as curl uses them to denote a defau…
shikokuchuo Oct 18, 2024
e12d6ae
solve more efficiently
shikokuchuo Oct 18, 2024
f4bfed1
update headers and add comments
shikokuchuo Oct 19, 2024
5684668
Correction for curl default timeout
shikokuchuo Oct 19, 2024
78ad736
global scope for callbackRegistryTable
shikokuchuo Oct 20, 2024
24cf1c6
do not use external api
shikokuchuo Oct 20, 2024
098dd2e
Add more comments
shikokuchuo Oct 20, 2024
d03060a
pre-create evaluation call for efficiency
shikokuchuo Oct 23, 2024
acf387c
do more on background thread
shikokuchuo Oct 23, 2024
ce0434b
register all events
shikokuchuo Oct 23, 2024
a967a81
upgrades select() to poll() / WSAPoll()
shikokuchuo Oct 23, 2024
6e0ae41
fix on windows
shikokuchuo Oct 23, 2024
daaaa43
expand API to take readfds, writefds, exceptfds
shikokuchuo Oct 23, 2024
25c21d8
update poll() error handling + pollfd creation optimization
shikokuchuo Oct 24, 2024
8fcd956
create pthread_attr once and cache
shikokuchuo Oct 24, 2024
3043ce4
implements cancellation
shikokuchuo Oct 24, 2024
61d3558
add cancellation test and cache call for higher efficiency
shikokuchuo Oct 24, 2024
08975e2
adds select() fallback for older R versions on Windows
shikokuchuo Oct 24, 2024
c8479c2
revert workflow file
shikokuchuo Oct 24, 2024
113e15e
update comments
shikokuchuo Oct 24, 2024
e4486fc
revert caching pthread_attr_t
shikokuchuo Oct 24, 2024
3b982bd
clearer poll() / select() results branches
shikokuchuo Oct 24, 2024
b448e25
tidy ups
shikokuchuo Oct 24, 2024
a0fa4a1
simplify by detaching from thread itself
shikokuchuo Oct 25, 2024
bf48672
use atomic flag for cancellation
shikokuchuo Oct 25, 2024
cd8c5e0
detach thread immediately after creation
shikokuchuo Oct 25, 2024
1ac44cc
modify poll return values
shikokuchuo Oct 25, 2024
d77ef9d
simplify poll logic
shikokuchuo Oct 25, 2024
1622227
update DESCRITION, NEWS, README and later_fd() docs
shikokuchuo Oct 25, 2024
3a46f35
simplify error path
shikokuchuo Oct 25, 2024
903b305
have cancellation function return invisibly for consistency with later()
shikokuchuo Oct 25, 2024
ae36f61
higher safety - do not access SEXPs at all from background thread
shikokuchuo Oct 26, 2024
d2e88e1
have wait thread periodically check for cancellation
shikokuchuo Oct 26, 2024
246317d
correct select() loop and optimize
shikokuchuo Oct 26, 2024
3501915
add more tests
shikokuchuo Oct 27, 2024
852e1d0
use rcpp function calling
shikokuchuo Oct 27, 2024
3626011
code style and clarity
shikokuchuo Oct 27, 2024
45b665f
create canceller in R
shikokuchuo Oct 27, 2024
989e33e
restrict scope of callbackRegistryTable
shikokuchuo Oct 27, 2024
d8eb8b7
add note on poll failures
shikokuchuo Oct 27, 2024
9ceaf14
split wait_thread() into two versions
shikokuchuo Oct 27, 2024
fb07bff
fix for windows
shikokuchuo Oct 27, 2024
a32b3ea
use separate results vector
shikokuchuo Oct 27, 2024
d8eb35f
use Timestamp class
shikokuchuo Oct 27, 2024
ec08515
update one test
shikokuchuo Oct 28, 2024
94f1b40
update one more test
shikokuchuo Oct 28, 2024
c46399c
allow older R versions to find WSAPoll() on Windows
shikokuchuo Oct 28, 2024
1529c5e
fix integer overflow for timer
shikokuchuo Oct 28, 2024
579134e
native later_fd concept
shikokuchuo Oct 28, 2024
4767c6f
register c callable
shikokuchuo Oct 28, 2024
cf6427a
add pollfd constructor
shikokuchuo Oct 28, 2024
c651489
completes refactor
shikokuchuo Oct 28, 2024
e7975cf
use header file to avoid repetition
shikokuchuo Oct 28, 2024
95adc95
use implicit SEXP conversion for xptr
shikokuchuo Oct 28, 2024
1bafc3b
efficiencies (thanks @jcheng5)
shikokuchuo Oct 28, 2024
1227d78
adds native later_fd() to later.h/later_api.h
shikokuchuo Oct 28, 2024
684806b
header order
shikokuchuo Oct 28, 2024
f89b745
typo in readme - array
shikokuchuo Oct 28, 2024
c76a4d3
have execLastFDNative() return void
shikokuchuo Oct 28, 2024
c601433
call detach from thread itself
shikokuchuo Oct 28, 2024
25d1ea7
move detach up
shikokuchuo Oct 28, 2024
be661aa
directly return SEXP from execLater_fd_impl()
shikokuchuo Oct 28, 2024
306c0b6
improve readme wording
shikokuchuo Oct 28, 2024
c8dadf5
Naming consistency: execLaterFDNative -> execLaterFdNative
shikokuchuo Oct 28, 2024
4945e79
Makes execLater_launch_thread() return int to execLater_fd_impl() - t…
shikokuchuo Oct 29, 2024
a7b224c
actually return bool and simplify
shikokuchuo Oct 29, 2024
1e22a0b
limit scope of internal functions
shikokuchuo Oct 29, 2024
9d4dce2
Merge pull request #1 from shikokuchuo/fd_native
shikokuchuo Oct 29, 2024
27e8bbe
Have execLaterFdNative() return int so C functions directly linking t…
shikokuchuo Oct 29, 2024
d812039
cap timeout at 3e10 seconds
shikokuchuo Oct 30, 2024
df35f4e
more efficient construction of Rcpp logical vector avoiding wrap()
shikokuchuo Oct 31, 2024
4b0f60e
simplify and push performance
shikokuchuo Oct 31, 2024
cc42e7f
encapsulate all creation logic in ThreadArgs class constructor
shikokuchuo Oct 31, 2024
ad06db4
further Rcpp-related optimizations
shikokuchuo Oct 31, 2024
7718eb6
more efficient interval polling logic
shikokuchuo Oct 31, 2024
95d83c0
use README example that actually runs
shikokuchuo Nov 2, 2024
0fa8ed3
allow callback to be run immediately when no file descriptors supplied
shikokuchuo Nov 4, 2024
a327a70
Apply suggestions from code review by @jcheng5
shikokuchuo Nov 6, 2024
1e9107c
num_fds no longer needed in ThreadArgs
shikokuchuo Nov 6, 2024
e0b21a9
rename native function
shikokuchuo Nov 6, 2024
f9dbb70
flag -> active; use atomic compare_exchange
shikokuchuo Nov 6, 2024
9b43fc9
move extern callbackRegistryTable declaration to header for maintaina…
shikokuchuo Nov 6, 2024
82cb321
make loop_empty() reflect later_fd callbacks - RAII approach
shikokuchuo Nov 7, 2024
b41203d
add some final cleanups and tests
shikokuchuo Nov 7, 2024
0d4699a
fold RefCounter into ThreadArgs itself; use more efficient doExecLate…
shikokuchuo Nov 7, 2024
2935304
revert use of doExecLater() - needs to be under mutex protection
shikokuchuo Nov 7, 2024
fd0d013
Add test skip for CRAN nosuggests
shikokuchuo Nov 8, 2024
37b6e86
move ThreadArgs registry to private
shikokuchuo Nov 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .github/workflows/R-CMD-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@
# For most R packages it is better to use https://github.com/r-lib/actions
on:
push:
branches: [main, rc-**]
pull_request:
branches: [main]
schedule:
- cron: '0 5 * * 1' # every monday

name: Package checks

Expand Down
3 changes: 2 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ Imports:
rlang
LinkingTo: Rcpp
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.3.1
RoxygenNote: 7.3.2
Suggests:
knitr,
nanonext,
rmarkdown,
testthat (>= 2.1.0)
VignetteBuilder: knitr
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export(destroy_loop)
export(exists_loop)
export(global_loop)
export(later)
export(later_fd)
export(loop_empty)
export(next_op_secs)
export(run_now)
Expand Down
4 changes: 4 additions & 0 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ using_ubsan <- function() {
.Call(`_later_using_ubsan`)
}

execLater_fd <- function(callback, fds, timeoutSecs, loop_id) {
.Call(`_later_execLater_fd`, callback, fds, timeoutSecs, loop_id)
}

setCurrentRegistryId <- function(id) {
invisible(.Call(`_later_setCurrentRegistryId`, id))
}
Expand Down
62 changes: 62 additions & 0 deletions R/later.R
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,68 @@ later <- function(func, delay = 0, loop = current_loop()) {
invisible(create_canceller(id, loop$id))
}

#' Executes a function when a file descriptor is ready
#'
#' @param func A function that takes a single argument, a logical vector that
#' indicates which file descriptors are ready. This may be all `FALSE` if the
#' `timeout` argument is non-`Inf`, or all `NA` if one or more file
#' descriptors are invalid.
#' @param fd_set Integer vector of file descriptors to monitor. Or on Windows,
#' `SOCKET` values.
#' @param timeout Number of seconds to wait before giving up, and calling `func`
#' with all `FALSE`.
#' @param loop A handle to an event loop. Defaults to the currently-active loop.
#'
#' @return A function, which, if invoked, will cancel the callback. The
#' function will return \code{TRUE} if the callback was successfully
#' cancelled and \code{FALSE} if not (this occurs if the callback has
#' executed or has been cancelled already).
#'
#' @examplesIf requireNamespace("nanonext", quietly = TRUE)
#' # create nanonext socket
#' s1 <- nanonext::socket(listen = "inproc://nano")
#' s2 <- nanonext::socket(dial = "inproc://nano")
#' fd1 <- nanonext::opt(s1, "recv-fd")
#' fd2 <- nanonext::opt(s2, "recv-fd")
#'
#' # 1. timeout: prints FALSE, FALSE
#' later_fd(print, c(fd1, fd2), 0.1)
#' Sys.sleep(0.2)
#' run_now()
#'
#' # 2. fd1 active: prints TRUE, FALSE
#' later_fd(print, c(fd1, fd2), 1)
#' res <- nanonext::send(s2, "msg")
#' Sys.sleep(0.1)
#' run_now()
#'
#' # 3. both active: prints TRUE, TRUE
#' res <- nanonext::send(s1, "msg")
#' later_fd(print, c(fd1, fd2), 1)
#' Sys.sleep(0.1)
#' run_now()
#'
#' # 4. fd2 active: prints FALSE, TRUE
#' res <- nanonext::recv(s1)
#' later_fd(print, c(fd1, fd2), 1)
#' Sys.sleep(0.1)
#' run_now()
#'
#' # 5. fds invalid: prints NA, NA
#' close(s2)
#' close(s1)
#' later_fd(print, c(fd1, fd2), 1)
#' Sys.sleep(0.1)
#' run_now()
#'
#' @export
later_fd <- function(func, fd_set, timeout = Inf, loop = current_loop()) {
if (!is.function(func)) {
func <- rlang::as_function(func)
}
invisible(execLater_fd(func, fd_set, timeout, loop$id))
}

# Returns a function that will cancel a callback with the given ID. If the
# callback has already been executed or canceled, then the function has no
# effect.
Expand Down
70 changes: 70 additions & 0 deletions man/later_fd.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/Makevars.win
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
PKG_CPPFLAGS = -DSTRICT_R_HEADERS
PKG_LIBS = -lWs2_32

#### Debugging flags ####
# Uncomment to enable thread assertions
Expand Down
14 changes: 14 additions & 0 deletions src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,20 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
// execLater_fd
Rcpp::LogicalVector execLater_fd(Rcpp::Function callback, Rcpp::IntegerVector fds, Rcpp::NumericVector timeoutSecs, Rcpp::IntegerVector loop_id);
RcppExport SEXP _later_execLater_fd(SEXP callbackSEXP, SEXP fdsSEXP, SEXP timeoutSecsSEXP, SEXP loop_idSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< Rcpp::Function >::type callback(callbackSEXP);
Rcpp::traits::input_parameter< Rcpp::IntegerVector >::type fds(fdsSEXP);
Rcpp::traits::input_parameter< Rcpp::NumericVector >::type timeoutSecs(timeoutSecsSEXP);
Rcpp::traits::input_parameter< Rcpp::IntegerVector >::type loop_id(loop_idSEXP);
rcpp_result_gen = Rcpp::wrap(execLater_fd(callback, fds, timeoutSecs, loop_id));
return rcpp_result_gen;
END_RCPP
}
// setCurrentRegistryId
void setCurrentRegistryId(int id);
RcppExport SEXP _later_setCurrentRegistryId(SEXP idSEXP) {
Expand Down
147 changes: 147 additions & 0 deletions src/fd.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
#ifdef _WIN32
#include <winsock2.h>
#endif
#include <Rcpp.h>
#include <unistd.h>
#include <cstdlib>
#include "later.h"
#include "callback_registry_table.h"

extern CallbackRegistryTable callbackRegistryTable;

typedef struct ThreadArgs_s {
SEXP callback;
R_xlen_t num_fds;
std::unique_ptr<std::vector<int>> fds;
fd_set read_fds;
struct timeval tv;
int max_fd;
int loop;
bool flag;
} ThreadArgs;

static void later_callback(void *arg) {

shikokuchuo marked this conversation as resolved.
Show resolved Hide resolved
std::unique_ptr<std::shared_ptr<ThreadArgs>> argsptr(static_cast<std::shared_ptr<ThreadArgs>*>(arg));
std::shared_ptr<ThreadArgs> args = *argsptr;

SEXP results, call;
// TODO: actually pre-allocate call on main thread, values can be safely updated from background thread
// only call Rf_eval() here to minimise runtime in later callback.
PROTECT(results = Rf_allocVector(LGLSXP, args->num_fds));
std::memcpy((void *) DATAPTR_RO(results), args->fds->data(), args->num_fds * sizeof(int));
PROTECT(call = Rf_lcons(args->callback, Rf_cons(results, R_NilValue)));
Rf_eval(call, R_GlobalEnv);
UNPROTECT(2);
R_ReleaseObject(args->callback);

}

// CONSIDER: upgrade select() to poll() / WSAPoll().
// CONSIDER: add method for HANDLES on Windows. Assuming we only accept integer
// values for both, we could use heuristics: check if it is a valid SOCKET or
// else assume HANDLE - but this is not fool-proof.
// Otherwise would require an interface for user to specify type.
static void *select_thread(void *arg) {

std::unique_ptr<std::shared_ptr<ThreadArgs>> argsptr(static_cast<std::shared_ptr<ThreadArgs>*>(arg));
jcheng5 marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<ThreadArgs> args = *argsptr;

// CONSIDER: if we should be checking more than read activity, i.e. write and error.
// Could check all types for all fds, which would be inefficient but
// otherwise would require user interface to specify events for each fd
int ready = select(args->max_fd + 1, &args->read_fds, NULL, NULL, args->flag ? &args->tv : NULL);
jcheng5 marked this conversation as resolved.
Show resolved Hide resolved
jcheng5 marked this conversation as resolved.
Show resolved Hide resolved

args->flag = ready < 0;

for (R_xlen_t i = 0; i < args->num_fds; i++) {
(*args->fds)[i] = args->flag ? R_NaInt : FD_ISSET((*args->fds)[i], &args->read_fds) != 0;
}

callbackRegistryTable.scheduleCallback(later_callback, static_cast<void *>(argsptr.release()), 0, args->loop);
jcheng5 marked this conversation as resolved.
Show resolved Hide resolved

return nullptr;

}

#ifdef _WIN32
static DWORD WINAPI select_thread_win(LPVOID lpParameter) {
select_thread(lpParameter);
return 1;
}
#endif

// [[Rcpp::export]]
Rcpp::LogicalVector execLater_fd(Rcpp::Function callback, Rcpp::IntegerVector fds, Rcpp::NumericVector timeoutSecs, Rcpp::IntegerVector loop_id) {

R_xlen_t num_fds = fds.size();
double timeout = timeoutSecs[0];
int loop = loop_id[0];
int max_fd = -1;

std::shared_ptr<ThreadArgs> args = std::make_shared<ThreadArgs>();
std::unique_ptr<std::vector<int>> fdvals(new std::vector<int>(num_fds));
args->num_fds = num_fds;
R_PreserveObject(callback);
args->callback = callback;
args->loop = loop;

FD_ZERO(&args->read_fds);

if (num_fds)
std::memcpy(fdvals->data(), fds.begin(), num_fds * sizeof(int));
for (R_xlen_t i = 0; i < num_fds; i++) {
FD_SET((*fdvals)[i], &args->read_fds);
max_fd = std::max(max_fd, (*fdvals)[i]);
}
args->fds = std::move(fdvals);
args->max_fd = max_fd;
args->flag = timeout != R_PosInf;

// handle -1 returned by curl_multi_timeout() with default of 1s
if (args->flag) {
args->tv.tv_sec = timeout < 0 ? 1 : (int) timeoutSecs[0];
args->tv.tv_usec = timeout < 0 ? 0 : ((int) (timeoutSecs[0] * 1000)) % 1000 * 1000;
}

std::unique_ptr<std::shared_ptr<ThreadArgs>> argsptr(new std::shared_ptr<ThreadArgs>(args));

#ifdef _WIN32

HANDLE hThread = CreateThread(NULL, 0, select_thread_win, static_cast<LPVOID>(argsptr.release()), 0, NULL);

if (hThread == NULL) {
Rcpp::stop("thread creation error: " + std::to_string(GetLastError()));
}

CloseHandle(hThread);

#else

pthread_attr_t attr;
pthread_t t;

// TODO: actually allocate global pthread_attr in .onLoad() and re-use, destroy in .onUnload()
if (pthread_attr_init(&attr))
Rcpp::stop("thread creation error: " + std::string(strerror(errno)));

if (pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) ||
pthread_create(&t, &attr, select_thread, static_cast<void *>(argsptr.release()))) {
pthread_attr_destroy(&attr);
Rcpp::stop("thread creation error: " + std::string(strerror(errno)));
}

if (pthread_attr_destroy(&attr)) {
Rcpp::stop("thread creation error: " + std::string(strerror(errno)));
}

#endif

// TODO: Add cancellation: clean way is to insert something that is also
// waited on that we can signal to return. But due to the setup overhead
// perhaps have this be optional. Otherwise can retain a reference to the
// thead to forcefully terminate it - but am wary of potential edge cases.

return true;

}
2 changes: 2 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ SEXP _later_ensureInitialized(void);
SEXP _later_execCallbacks(SEXP, SEXP, SEXP);
SEXP _later_idle(SEXP);
SEXP _later_execLater(SEXP, SEXP, SEXP);
SEXP _later_execLater_fd(SEXP, SEXP, SEXP, SEXP);
SEXP _later_cancel(SEXP, SEXP);
SEXP _later_nextOpSecs(SEXP);
SEXP _later_testCallbackOrdering(void);
Expand All @@ -33,6 +34,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_later_execCallbacks", (DL_FUNC) &_later_execCallbacks, 3},
{"_later_idle", (DL_FUNC) &_later_idle, 1},
{"_later_execLater", (DL_FUNC) &_later_execLater, 3},
{"_later_execLater_fd", (DL_FUNC) &_later_execLater_fd, 4},
{"_later_cancel", (DL_FUNC) &_later_cancel, 2},
{"_later_nextOpSecs", (DL_FUNC) &_later_nextOpSecs, 1},
{"_later_testCallbackOrdering", (DL_FUNC) &_later_testCallbackOrdering, 0},
Expand Down
2 changes: 1 addition & 1 deletion src/later.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ using std::shared_ptr;

static size_t exec_callbacks_reentrancy_count = 0;

static CallbackRegistryTable callbackRegistryTable;
CallbackRegistryTable callbackRegistryTable;


class ProtectCallbacks {
Expand Down
Loading