Skip to content

Commit

Permalink
Adds an API for checking the callback queue length for a loop.
Browse files Browse the repository at this point in the history
This introduces loop_queue_length(), which is similar to loop_empty()
but provides a more granular measure of what remains to be executed.
Unlike list_queue(), it is an exported function meant to be used
externally.

Includes documentation and tests.
  • Loading branch information
atheriel committed Apr 21, 2020
1 parent 0fb877a commit 0a4125f
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 0 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export(exists_loop)
export(global_loop)
export(later)
export(loop_empty)
export(loop_queue_length)
export(next_op_secs)
export(run_now)
export(with_loop)
Expand Down
4 changes: 4 additions & 0 deletions R/RcppExports.R
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ idle <- function(loop) {
.Call('_later_idle', PACKAGE = 'later', loop)
}

queueLength <- function(loop) {
.Call('_later_queueLength', PACKAGE = 'later', loop)
}

execLater <- function(callback, delaySecs, loop) {
.Call('_later_execLater', PACKAGE = 'later', callback, delaySecs, loop)
}
Expand Down
11 changes: 11 additions & 0 deletions R/later.R
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,17 @@ loop_empty <- function(loop = current_loop()) {
idle(loop$id)
}

#' Check how many callbacks are queued in a later loop
#'
#' Returns the number of callbacks that are scheduled to execute in the present
#' or future.
#'
#' @inheritParams create_loop
#' @export
loop_queue_length <- function(loop = current_loop()) {
queueLength(loop$id)
}

#' Relative time to next scheduled operation
#'
#' Returns the duration between now and the earliest operation that is currently
Expand Down
15 changes: 15 additions & 0 deletions man/loop_queue_length.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions src/RcppExports.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,17 @@ BEGIN_RCPP
return rcpp_result_gen;
END_RCPP
}
// queueLength
size_t queueLength(int loop);
RcppExport SEXP _later_queueLength(SEXP loopSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< int >::type loop(loopSEXP);
rcpp_result_gen = Rcpp::wrap(queueLength(loop));
return rcpp_result_gen;
END_RCPP
}
// execLater
std::string execLater(Rcpp::Function callback, double delaySecs, int loop);
RcppExport SEXP _later_execLater(SEXP callbackSEXP, SEXP delaySecsSEXP, SEXP loopSEXP) {
Expand Down
5 changes: 5 additions & 0 deletions src/callback_registry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ bool CallbackRegistry::empty() const {
return this->queue.empty();
}

size_t CallbackRegistry::queueLength() const {
Guard guard(mutex);
return this->queue.size();
}

// Returns true if the smallest timestamp exists and is not in the future.
bool CallbackRegistry::due(const Timestamp& time) const {
Guard guard(mutex);
Expand Down
3 changes: 3 additions & 0 deletions src/callback_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ class CallbackRegistry {
// Is the registry completely empty?
bool empty() const;

// How many callbacks are currently queued?
size_t queueLength() const;

// Is anything ready to execute?
bool due(const Timestamp& time = Timestamp()) const;

Expand Down
2 changes: 2 additions & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Check these declarations against the C/Fortran source code.
extern SEXP _later_ensureInitialized();
extern SEXP _later_execCallbacks(SEXP, SEXP, SEXP);
extern SEXP _later_idle(SEXP);
extern SEXP _later_queueLength(SEXP);
extern SEXP _later_execLater(SEXP, SEXP, SEXP);
extern SEXP _later_cancel(SEXP, SEXP);
extern SEXP _later_nextOpSecs(SEXP);
Expand All @@ -26,6 +27,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_later_ensureInitialized", (DL_FUNC) &_later_ensureInitialized, 0},
{"_later_execCallbacks", (DL_FUNC) &_later_execCallbacks, 3},
{"_later_idle", (DL_FUNC) &_later_idle, 1},
{"_later_queueLength", (DL_FUNC) &_later_queueLength, 1},
{"_later_execLater", (DL_FUNC) &_later_execLater, 3},
{"_later_cancel", (DL_FUNC) &_later_cancel, 2},
{"_later_nextOpSecs", (DL_FUNC) &_later_nextOpSecs, 1},
Expand Down
7 changes: 7 additions & 0 deletions src/later.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ bool idle(int loop) {
return getCallbackRegistry(loop)->empty();
}

// [[Rcpp::export]]
size_t queueLength(int loop) {
ASSERT_MAIN_THREAD()
Guard guard(callbackRegistriesMutex);
return getCallbackRegistry(loop)->queueLength();
}

// [[Rcpp::export]]
std::string execLater(Rcpp::Function callback, double delaySecs, int loop) {
ASSERT_MAIN_THREAD()
Expand Down
1 change: 1 addition & 0 deletions src/later.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ bool at_top_level();

bool execCallbacks(double timeoutSecs = 0, bool runAll = true, int loop = GLOBAL_LOOP);
bool idle(int loop);
size_t queueLength(int loop);

extern "C" uint64_t execLaterNative(void (*func)(void*), void* data, double secs);
extern "C" uint64_t execLaterNative2(void (*func)(void*), void* data, double secs, int loop);
Expand Down
22 changes: 22 additions & 0 deletions tests/testthat/test-private-loops.R
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,25 @@ test_that("list_queue", {
q <- list_queue(l)
expect_equal(length(q), 0)
})


describe("Queue length with nested loops", {
x <- 0
later(~{x <<- 1}, 0)
expect_equal(loop_queue_length(), 1)
with_temp_loop({
expect_equal(loop_queue_length(), 0)

later(~{x <<- 2})
expect_equal(loop_queue_length(), 1)

run_now()
expect_identical(x, 2)
expect_equal(loop_queue_length(), 0)
expect_equal(loop_queue_length(loop = global_loop()), 1)

run_now(loop = global_loop())
expect_equal(loop_queue_length(loop = global_loop()), 0)
expect_identical(x, 1)
})
})

0 comments on commit 0a4125f

Please sign in to comment.