diff --git a/src/fd.cpp b/src/fd.cpp index 74db2241..a19da62f 100644 --- a/src/fd.cpp +++ b/src/fd.cpp @@ -1,9 +1,11 @@ #ifdef _WIN32 #ifndef FD_SETSIZE -#define FD_SETSIZE 2048 // Set max fds - only for select() fallback +#define FD_SETSIZE 2048 // set max fds - only for select() fallback on R <= 4.1 #endif +#define LATER POLL_FUNC WSAPoll #include #else +#define LATER_POLL_FUNC poll #include #endif #include @@ -16,12 +18,6 @@ #define LATER_INTERVAL 1024 -#ifdef _WIN32 -#define POLL_FUNC WSAPoll -#else -#define POLL_FUNC poll -#endif - typedef struct ThreadArgs_s { std::shared_ptr> flag; std::unique_ptr callback; @@ -46,6 +42,9 @@ static void later_callback(void *arg) { // CONSIDER: if necessary to add method for HANDLES on Windows. Would be different code to SOCKETs. // TODO: implement re-usable background thread. + +#ifdef POLLIN // all platforms except R <= 4.1 on Windows + static int wait_thread(void *arg) { std::unique_ptr> argsptr(static_cast*>(arg)); @@ -57,54 +56,6 @@ static int wait_thread(void *arg) { int timeoutms = infinite || args->timeout < 0 ? 1000 : (int) (args->timeout * 1000); int repeats = infinite || args->timeout < 0 ? 0 : timeoutms / LATER_INTERVAL; -#ifndef POLLIN // fall back to select() for R <= 4.1 and older Windows - - fd_set readfds, writefds, exceptfds; - FD_ZERO(&readfds); - FD_ZERO(&writefds); - FD_ZERO(&exceptfds); - const int max_fd = *std::max_element(args->fds->begin(), args->fds->end()); - struct timeval tv; - - do { - for (int i = 0; i < args->rfds; i++) { - FD_SET((*args->fds)[i], &readfds); - } - for (int i = args->rfds; i < (args->rfds + args->wfds); i++) { - FD_SET((*args->fds)[i], &writefds); - } - for (int i = args->rfds + args->wfds; i < args->num_fds; i++) { - FD_SET((*args->fds)[i], &exceptfds); - } - tv.tv_sec = (repeats ? LATER_INTERVAL : timeoutms) / 1000; - tv.tv_usec = ((repeats ? LATER_INTERVAL : timeoutms) % 1000) * 1000; - - result = select(max_fd + 1, &readfds, &writefds, &exceptfds, &tv); - - if (args->flag->load()) return 1; - if (result) break; - } while (infinite || (repeats-- && (timeoutms -= LATER_INTERVAL))); - - if (result > 0) { - for (int i = 0; i < args->rfds; i++) { - (*args->fds)[i] = FD_ISSET((*args->fds)[i], &readfds); - } - for (int i = args->rfds; i < (args->rfds + args->wfds); i++) { - (*args->fds)[i] = FD_ISSET((*args->fds)[i], &writefds); - } - for (int i = args->rfds + args->wfds; i < args->num_fds; i++) { - (*args->fds)[i] = FD_ISSET((*args->fds)[i], &exceptfds); - } - } else if (result == 0) { - std::memset(args->fds->data(), 0, args->num_fds * sizeof(int)); - } else { - for (int i = 0; i < args->num_fds; i++) { - (*args->fds)[i] = R_NaInt; - } - } - -#else - std::vector pollfds; pollfds.reserve(args->num_fds); struct pollfd pfd; @@ -128,7 +79,7 @@ static int wait_thread(void *arg) { } do { - result = POLL_FUNC(pollfds.data(), args->num_fds, repeats ? LATER_INTERVAL : timeoutms); + result = LATER_POLL_FUNC(pollfds.data(), args->num_fds, repeats ? LATER_INTERVAL : timeoutms); if (args->flag->load()) return 1; if (result) break; } while (infinite || (repeats-- && (timeoutms -= LATER_INTERVAL))); @@ -151,7 +102,68 @@ static int wait_thread(void *arg) { } } -#endif // POLLIN + execLaterNative2(later_callback, static_cast(argsptr.release()), 0, args->loop); + + return 0; + +} + +#else // no POLLIN: fall back to select() for R <= 4.1 on Windows + +static int wait_thread(void *arg) { + + std::unique_ptr> argsptr(static_cast*>(arg)); + std::shared_ptr args = *argsptr; + + int result; + + const bool infinite = args->timeout == R_PosInf; + int timeoutms = infinite || args->timeout < 0 ? 1000 : (int) (args->timeout * 1000); + int repeats = infinite || args->timeout < 0 ? 0 : timeoutms / LATER_INTERVAL; + + fd_set readfds, writefds, exceptfds; + FD_ZERO(&readfds); + FD_ZERO(&writefds); + FD_ZERO(&exceptfds); + const int max_fd = *std::max_element(args->fds->begin(), args->fds->end()); + struct timeval tv; + + do { + for (int i = 0; i < args->rfds; i++) { + FD_SET((*args->fds)[i], &readfds); + } + for (int i = args->rfds; i < (args->rfds + args->wfds); i++) { + FD_SET((*args->fds)[i], &writefds); + } + for (int i = args->rfds + args->wfds; i < args->num_fds; i++) { + FD_SET((*args->fds)[i], &exceptfds); + } + tv.tv_sec = (repeats ? LATER_INTERVAL : timeoutms) / 1000; + tv.tv_usec = ((repeats ? LATER_INTERVAL : timeoutms) % 1000) * 1000; + + result = select(max_fd + 1, &readfds, &writefds, &exceptfds, &tv); + + if (args->flag->load()) return 1; + if (result) break; + } while (infinite || (repeats-- && (timeoutms -= LATER_INTERVAL))); + + if (result > 0) { + for (int i = 0; i < args->rfds; i++) { + (*args->fds)[i] = FD_ISSET((*args->fds)[i], &readfds); + } + for (int i = args->rfds; i < (args->rfds + args->wfds); i++) { + (*args->fds)[i] = FD_ISSET((*args->fds)[i], &writefds); + } + for (int i = args->rfds + args->wfds; i < args->num_fds; i++) { + (*args->fds)[i] = FD_ISSET((*args->fds)[i], &exceptfds); + } + } else if (result == 0) { + std::memset(args->fds->data(), 0, args->num_fds * sizeof(int)); + } else { + for (int i = 0; i < args->num_fds; i++) { + (*args->fds)[i] = R_NaInt; + } + } execLaterNative2(later_callback, static_cast(argsptr.release()), 0, args->loop); @@ -159,6 +171,8 @@ static int wait_thread(void *arg) { } +#endif // POLLIN + // [[Rcpp::export]] Rcpp::RObject execLater_fd(Rcpp::Function callback, Rcpp::IntegerVector readfds, Rcpp::IntegerVector writefds, Rcpp::IntegerVector exceptfds, Rcpp::NumericVector timeoutSecs, Rcpp::IntegerVector loop_id) {