Skip to content

Commit

Permalink
split wait_thread() into two versions
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Oct 27, 2024
1 parent d8eb8b7 commit 9ceaf14
Showing 1 changed file with 71 additions and 57 deletions.
128 changes: 71 additions & 57 deletions src/fd.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#ifdef _WIN32
#ifndef FD_SETSIZE
#define FD_SETSIZE 2048 // Set max fds - only for select() fallback
#define FD_SETSIZE 2048 // set max fds - only for select() fallback on R <= 4.1
#endif
#define LATER POLL_FUNC WSAPoll
#include <winsock2.h>
#else
#define LATER_POLL_FUNC poll
#include <poll.h>
#endif
#include <Rcpp.h>
Expand All @@ -16,12 +18,6 @@

#define LATER_INTERVAL 1024

#ifdef _WIN32
#define POLL_FUNC WSAPoll
#else
#define POLL_FUNC poll
#endif

typedef struct ThreadArgs_s {
std::shared_ptr<std::atomic<bool>> flag;
std::unique_ptr<Rcpp::Function> callback;
Expand All @@ -46,6 +42,9 @@ static void later_callback(void *arg) {

// CONSIDER: if necessary to add method for HANDLES on Windows. Would be different code to SOCKETs.
// TODO: implement re-usable background thread.

#ifdef POLLIN // all platforms except R <= 4.1 on Windows

static int wait_thread(void *arg) {

std::unique_ptr<std::shared_ptr<ThreadArgs>> argsptr(static_cast<std::shared_ptr<ThreadArgs>*>(arg));
Expand All @@ -57,54 +56,6 @@ static int wait_thread(void *arg) {
int timeoutms = infinite || args->timeout < 0 ? 1000 : (int) (args->timeout * 1000);
int repeats = infinite || args->timeout < 0 ? 0 : timeoutms / LATER_INTERVAL;

#ifndef POLLIN // fall back to select() for R <= 4.1 and older Windows

fd_set readfds, writefds, exceptfds;
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_ZERO(&exceptfds);
const int max_fd = *std::max_element(args->fds->begin(), args->fds->end());
struct timeval tv;

do {
for (int i = 0; i < args->rfds; i++) {
FD_SET((*args->fds)[i], &readfds);
}
for (int i = args->rfds; i < (args->rfds + args->wfds); i++) {
FD_SET((*args->fds)[i], &writefds);
}
for (int i = args->rfds + args->wfds; i < args->num_fds; i++) {
FD_SET((*args->fds)[i], &exceptfds);
}
tv.tv_sec = (repeats ? LATER_INTERVAL : timeoutms) / 1000;
tv.tv_usec = ((repeats ? LATER_INTERVAL : timeoutms) % 1000) * 1000;

result = select(max_fd + 1, &readfds, &writefds, &exceptfds, &tv);

if (args->flag->load()) return 1;
if (result) break;
} while (infinite || (repeats-- && (timeoutms -= LATER_INTERVAL)));

if (result > 0) {
for (int i = 0; i < args->rfds; i++) {
(*args->fds)[i] = FD_ISSET((*args->fds)[i], &readfds);
}
for (int i = args->rfds; i < (args->rfds + args->wfds); i++) {
(*args->fds)[i] = FD_ISSET((*args->fds)[i], &writefds);
}
for (int i = args->rfds + args->wfds; i < args->num_fds; i++) {
(*args->fds)[i] = FD_ISSET((*args->fds)[i], &exceptfds);
}
} else if (result == 0) {
std::memset(args->fds->data(), 0, args->num_fds * sizeof(int));
} else {
for (int i = 0; i < args->num_fds; i++) {
(*args->fds)[i] = R_NaInt;
}
}

#else

std::vector<struct pollfd> pollfds;
pollfds.reserve(args->num_fds);
struct pollfd pfd;
Expand All @@ -128,7 +79,7 @@ static int wait_thread(void *arg) {
}

do {
result = POLL_FUNC(pollfds.data(), args->num_fds, repeats ? LATER_INTERVAL : timeoutms);
result = LATER_POLL_FUNC(pollfds.data(), args->num_fds, repeats ? LATER_INTERVAL : timeoutms);
if (args->flag->load()) return 1;
if (result) break;
} while (infinite || (repeats-- && (timeoutms -= LATER_INTERVAL)));
Expand All @@ -151,14 +102,77 @@ static int wait_thread(void *arg) {
}
}

#endif // POLLIN
execLaterNative2(later_callback, static_cast<void *>(argsptr.release()), 0, args->loop);

return 0;

}

#else // no POLLIN: fall back to select() for R <= 4.1 on Windows

static int wait_thread(void *arg) {

std::unique_ptr<std::shared_ptr<ThreadArgs>> argsptr(static_cast<std::shared_ptr<ThreadArgs>*>(arg));
std::shared_ptr<ThreadArgs> args = *argsptr;

int result;

const bool infinite = args->timeout == R_PosInf;
int timeoutms = infinite || args->timeout < 0 ? 1000 : (int) (args->timeout * 1000);
int repeats = infinite || args->timeout < 0 ? 0 : timeoutms / LATER_INTERVAL;

fd_set readfds, writefds, exceptfds;
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_ZERO(&exceptfds);
const int max_fd = *std::max_element(args->fds->begin(), args->fds->end());
struct timeval tv;

do {
for (int i = 0; i < args->rfds; i++) {
FD_SET((*args->fds)[i], &readfds);
}
for (int i = args->rfds; i < (args->rfds + args->wfds); i++) {
FD_SET((*args->fds)[i], &writefds);
}
for (int i = args->rfds + args->wfds; i < args->num_fds; i++) {
FD_SET((*args->fds)[i], &exceptfds);
}
tv.tv_sec = (repeats ? LATER_INTERVAL : timeoutms) / 1000;
tv.tv_usec = ((repeats ? LATER_INTERVAL : timeoutms) % 1000) * 1000;

result = select(max_fd + 1, &readfds, &writefds, &exceptfds, &tv);

if (args->flag->load()) return 1;
if (result) break;
} while (infinite || (repeats-- && (timeoutms -= LATER_INTERVAL)));

if (result > 0) {
for (int i = 0; i < args->rfds; i++) {
(*args->fds)[i] = FD_ISSET((*args->fds)[i], &readfds);
}
for (int i = args->rfds; i < (args->rfds + args->wfds); i++) {
(*args->fds)[i] = FD_ISSET((*args->fds)[i], &writefds);
}
for (int i = args->rfds + args->wfds; i < args->num_fds; i++) {
(*args->fds)[i] = FD_ISSET((*args->fds)[i], &exceptfds);
}
} else if (result == 0) {
std::memset(args->fds->data(), 0, args->num_fds * sizeof(int));
} else {
for (int i = 0; i < args->num_fds; i++) {
(*args->fds)[i] = R_NaInt;
}
}

execLaterNative2(later_callback, static_cast<void *>(argsptr.release()), 0, args->loop);

return 0;

}

#endif // POLLIN

// [[Rcpp::export]]
Rcpp::RObject execLater_fd(Rcpp::Function callback, Rcpp::IntegerVector readfds, Rcpp::IntegerVector writefds,
Rcpp::IntegerVector exceptfds, Rcpp::NumericVector timeoutSecs, Rcpp::IntegerVector loop_id) {
Expand Down

0 comments on commit 9ceaf14

Please sign in to comment.