Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New hybrid dispatcher concept (enables mirai cancellation) #170

Merged
merged 75 commits into from
Dec 30, 2024
Merged

Conversation

shikokuchuo
Copy link
Owner

@shikokuchuo shikokuchuo commented Nov 27, 2024

New default dispatcher

Inviting @jcheng5 @hadley for review and comment.

This is a complete re-design of dispatcher using a more optimal and future-proof architecture.

The key difference is that whereas previously dispatcher used one socket per daemon connection, it now uses one socket for all daemon connections.

The previous configuration was necessary due to limitations of the NNG req/rep (PRC) protocol using a round-robin algorithm. This is typically optimal behaviour for web servers and similar applications, but not for arbitrary execution where evaluation times may vary greatly.

The novel solution is to preserve the req/rep abstraction for the user, with mirai() working substantially the same way, but to re-wire the dispatcher / daemon back-end to use an alternative protocol (NNG's pair 1 poly). This allows one-to-many mapping with directed sends, which is exactly what's needed to implement an optimal task queue dispatcher.

Mirai Cancellation

Allows actual cancellation of mirai when using stop_mirai(), as per the below example:

  • stop_mirai() returns TRUE if the task is awaiting execution - it is discarded from the queue. Cancellation is certain.
  • stop_mirai() returns NA if the task was in-execution. An interrupt is sent to the daemon process, but it may have been partially executed by this point.
  • stop_mirai() returns FALSE if the task has already completed or been cancelled.
library(mirai)
daemons(2, output = TRUE)
#> [1] 2
m1 <- mirai({Sys.sleep(1); print("mirai 1 done")})
m2 <- mirai({Sys.sleep(1); print("mirai 2 done")})
m3 <- mirai({Sys.sleep(1); print("mirai 3 done")})
Sys.sleep(0.1)
(stop_mirai(m3))
#> [1] TRUE
(stop_mirai(m2))
#> [1] NA
(stop_mirai(m2))
#> [1] FALSE
#> [1] "mirai 1 done"

Created on 2024-12-01 with reprex v2.1.1

Other notable improvements

  • Only one (IPC) socket connection from host to dispatcher (previously there was an additional control socket for sending information requests).
  • For distributed computing, plain TCP connections can be used without the added overhead of the websocket layer, as only one URL is required no matter how many daemons. We also don't need to manage many URLs and ensure a one-to-one mapping of each.
  • Connection changes tracked efficiently and accurately (due to upgrades at nanonext) hence retiring a lot of custom code that was designed to deal with this.
  • Connected daemons can be scaled up and down freely without limit (we had to set a maximum before).
  • More succinct and intuitive status summary, for example (borrowing the above example):
library(mirai)
daemons(2, output = TRUE)
#> [1] 2
m1 <- mirai({Sys.sleep(1); print("mirai 1 done")})
m2 <- mirai({Sys.sleep(1); print("mirai 2 done")})
m3 <- mirai({Sys.sleep(1); print("mirai 3 done")})
Sys.sleep(0.1)
status()
#> $connections
#> [1] 2
#> 
#> $daemons
#> [1] "abstract://595b781203f9cad1ea0f39f8"
#> 
#> $mirai
#>  awaiting executing completed 
#>         1         2         0

Created on 2024-12-01 with reprex v2.1.1

@shikokuchuo shikokuchuo marked this pull request as draft November 27, 2024 15:26
@codecov-commenter
Copy link

codecov-commenter commented Nov 28, 2024

⚠️ Please install the 'codecov app svg image' to ensure uploads and comments are reliably processed by Codecov.

Codecov Report

Attention: Patch coverage is 99.27007% with 2 lines in your changes missing coverage. Please review.

Project coverage is 97.80%. Comparing base (4e2d996) to head (3e842f1).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
R/daemons.R 98.03% 1 Missing ⚠️
R/dispatcher.R 99.18% 1 Missing ⚠️

❗ Your organization needs to install the Codecov GitHub app to enable full functionality.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #170      +/-   ##
==========================================
- Coverage   99.71%   97.80%   -1.92%     
==========================================
  Files           9        9              
  Lines         692      864     +172     
==========================================
+ Hits          690      845     +155     
- Misses          2       19      +17     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@shikokuchuo
Copy link
Owner Author

shikokuchuo commented Dec 3, 2024

I should probably add a little more on the cancellation implementation itself.

R is single-threaded, so the only way of stopping evaluation is through an interrupt signal. This must be delivered concurrent with the evaluation, i.e. from another thread. Fortunately NNG, as a concurrency framework, provides these, and we just re-purpose an async message receive callback for this purpose (in much the same way a callback can make a call into later and resolve a promise). This way we can send interrupts to any connected peer (not just on the local machine).

Then at the daemon (worker) we just have an async receive ready while we're evaluating, which will raise an interrupt if a cancellation message is received.

The actual cancellation instruction is sent from stop_mirai() - where the payload is a (non-serialized) integer zero followed by the message ID. The zero sentinel (as the first byte) triggers this branch of dispatcher to run, which finds the msgid and either discards it if in the in-queue or sends a message to the relevant daemon if it's already being executed.

As to why the whole new architecture, it stems from the fact that previously we didn't eagerly realise the task queue at dispatcher, but kept them buffered at the system sockets, only materializing them when a daemon was ready to accept. This meant that it would have been cumbersome to implement cancellation, as we had no access to queued tasks whatsoever. And if we were going to change this behaviour, then it made sense to think about the design holistically and update everything at the same time.

The result is a consciously much cleaner design, whereas previously the development has been pretty much organic and demand/feature-driven.

@shikokuchuo
Copy link
Owner Author

I've implemented my suggestion above in dd30542, so stop_mirai() now returns either TRUE or FALSE. As I think the if (stop_mirai(...)) idiom is probably useful to have.

@shikokuchuo
Copy link
Owner Author

@jcheng5 I'll keep this open, but you can assume that I'll merge it as is (barring further comments from yourself/Hadley).

I'll defer to you what you think is the best way of making stop_mirai() available in Shiny. If it helps to do something on my end just let me know. For example, I can easily keep a reference to the mirai in the promise constructor here by adding something like:

promise[["mirai"]] <- x

If you're making changes to the promises package, please also take a look at rstudio/promises#111 where I've provided suggestions you can adopt directly in your PR there.

@shikokuchuo shikokuchuo merged commit 424fabf into main Dec 30, 2024
10 checks passed
@shikokuchuo shikokuchuo deleted the v2 branch December 30, 2024 11:51
This was referenced Dec 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants