Skip to content

Commit

Permalink
persistent wait thread concept
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Nov 14, 2024
1 parent e3239dc commit 58bace2
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 17 deletions.
115 changes: 100 additions & 15 deletions src/fd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
#include <cstdlib>
#include <atomic>
#include <memory>
#include "tinycthread.h"
#include "later.h"
#include "threadutils.h"
#include "callback_registry_table.h"

class ThreadArgs {
Expand Down Expand Up @@ -81,6 +81,57 @@ class ThreadArgs {

};

static Mutex mtx(tct_mtx_plain);
static ConditionVariable cv(mtx);
static int busy = 0;
static std::unique_ptr<std::shared_ptr<ThreadArgs>> threadargs = nullptr;

static int wait_thread_persistent(void *arg);

class PersistentThread {
tct_thrd_t thr = 0;

public:
PersistentThread() {
if (tct_thrd_create(&thr, &wait_thread_persistent, NULL) != tct_thrd_success)
throw std::runtime_error("Thread creation failed.");
}
~PersistentThread() {
Guard guard(&mtx);
if (threadargs != nullptr) {
(*threadargs)->active->store(false);
}
busy = -1;
cv.broadcast();
}

};

static int wait_for_signal() {
Guard guard(&mtx);
while (!busy)
cv.wait();
return busy;
}

static int submit_wait(std::shared_ptr<ThreadArgs> args) {
Guard guard(&mtx);
if (busy)
return busy;
threadargs.reset(new std::shared_ptr<ThreadArgs>(args));
busy = 1;
cv.broadcast();
return 0;
}

static int wait_done() {
Guard guard(&mtx);
threadargs.reset();
int ret = busy;
busy = 0;
return ret;
}

static void later_callback(void *arg) {

ASSERT_MAIN_THREAD()
Expand All @@ -104,15 +155,7 @@ static void later_callback(void *arg) {
}

// CONSIDER: if necessary to add method for HANDLES on Windows. Would be different code to SOCKETs.
// TODO: implement re-usable background thread.
static int wait_thread(void *arg) {

tct_thrd_detach(tct_thrd_current());

std::unique_ptr<std::shared_ptr<ThreadArgs>> argsptr(static_cast<std::shared_ptr<ThreadArgs>*>(arg));
std::shared_ptr<ThreadArgs> args = *argsptr;

// poll() whilst checking for cancellation at intervals
static int wait_on_fds(std::shared_ptr<ThreadArgs> args) {

int ready;
double waitFor = std::fmax(args->timeout.diff_secs(Timestamp()), 0);
Expand All @@ -124,8 +167,6 @@ static int wait_thread(void *arg) {
if (ready) break;
} while ((waitFor = args->timeout.diff_secs(Timestamp())) > 0);

// store pollfd revents in args->results for use by callback

if (ready > 0) {
for (std::size_t i = 0; i < args->fds.size(); i++) {
(args->results)[i] = (args->fds)[i].revents == 0 ? 0 : (args->fds)[i].revents & (POLLIN | POLLOUT) ? 1: NA_INTEGER;
Expand All @@ -134,7 +175,43 @@ static int wait_thread(void *arg) {
std::fill(args->results.begin(), args->results.end(), NA_INTEGER);
}

callbackRegistryTable.scheduleCallback(later_callback, static_cast<void *>(argsptr.release()), 0, args->loop);
return 0;

}

static int wait_thread_single(void *arg) {

tct_thrd_detach(tct_thrd_current());

std::unique_ptr<std::shared_ptr<ThreadArgs>> argsptr(static_cast<std::shared_ptr<ThreadArgs>*>(arg));
std::shared_ptr<ThreadArgs> args = *argsptr;

if (wait_on_fds(args) == 0) {
callbackRegistryTable.scheduleCallback(later_callback, static_cast<void *>(argsptr.release()), 0, args->loop);
}

return 0;

}

static int wait_thread_persistent(void *arg) {

tct_thrd_detach(tct_thrd_current());

while (1) {

if (wait_for_signal() < 0)
break;

const int loop = (*threadargs)->loop;
if (wait_on_fds(*threadargs) == 0) {
callbackRegistryTable.scheduleCallback(later_callback, static_cast<void *>(threadargs.release()), 0, loop);
}

if (wait_done() < 0)
break;

}

return 0;

Expand All @@ -144,9 +221,17 @@ static int execLater_launch_thread(std::shared_ptr<ThreadArgs> args) {

std::unique_ptr<std::shared_ptr<ThreadArgs>> argsptr(new std::shared_ptr<ThreadArgs>(args));

tct_thrd_t thr;
// static initialization ensures finalizer runs before those for the condition variable / mutex
static PersistentThread persistentthread;

int ret;
if ((ret = submit_wait(args))) {
// create single wait thread if persistent thread is busy
tct_thrd_t thr;
ret = tct_thrd_create(&thr, &wait_thread_single, static_cast<void *>(argsptr.release())) != tct_thrd_success;
}

return tct_thrd_create(&thr, &wait_thread, static_cast<void *>(argsptr.release())) != tct_thrd_success;
return ret;

}

Expand Down
12 changes: 10 additions & 2 deletions tests/testthat/test-later-fd.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ context("test-later-fd.R")
test_that("later_fd", {
skip_if_not_installed("nanonext")

result <- NULL
result2 <- result <- NULL
callback <- function(x) result <<- x
callback2 <- function(x) result2 <<- x
s1 <- nanonext::socket(listen = "inproc://nanonext")
on.exit(close(s1))
s2 <- nanonext::socket(dial = "inproc://nanonext")
Expand All @@ -17,9 +18,16 @@ test_that("later_fd", {
Sys.sleep(0.2)
run_now()
expect_equal(result, c(FALSE, FALSE))
later_fd(callback, c(fd1, fd2), exceptfds = c(fd1, fd2), timeout = 0)

# concurrent waits
later_fd(callback, c(fd1, fd2), exceptfds = c(fd1, fd2), timeout = 0.4)
later_fd(callback2, c(fd1, fd2), exceptfds = c(fd1, fd2), timeout = 0)
Sys.sleep(0.2)
run_now()
expect_equal(result2, c(FALSE, FALSE, FALSE, FALSE))
expect_equal(result, c(FALSE, FALSE))
Sys.sleep(0.4)
run_now()
expect_equal(result, c(FALSE, FALSE, FALSE, FALSE))

# cancellation
Expand Down

0 comments on commit 58bace2

Please sign in to comment.