Skip to content

Commit

Permalink
builds in SSH reverse tunnelling for make_cluster()
Browse files Browse the repository at this point in the history
  • Loading branch information
shikokuchuo committed Sep 28, 2023
1 parent 7f83b74 commit 5c37083
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 31 deletions.
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Package: mirai
Type: Package
Title: Minimalist Async Evaluation Framework for R
Version: 0.10.0.9015
Version: 0.10.0.9016
Description: Lightweight parallel code execution and distributed computing.
Designed for simplicity, a 'mirai' evaluates an R expression asynchronously,
on local or network resources, resolving automatically upon completion.
Expand Down
5 changes: 3 additions & 2 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# mirai 0.10.0.9015 (development)
# mirai 0.10.0.9016 (development)

* Implements an alternative communications backend for R, adding methods for the 'parallel' base package.
+ `make_cluster()` creates a 'miraiCluster', compatible with all existing functions taking a 'cluster' object.
+ `make_cluster()` creates a 'miraiCluster', compatible with all existing functions taking a 'cluster' object, for example in the 'parallel' and 'doParallel'/'foreach' packages.
+ Fulfils a request by R Core at R Project Sprint 2023, and requires R >= 4.4 (currently R-devel).
+ Ability to launch nodes on the local machine or remotely via SSH (optionally using reverse tunnelling).
* `daemons()` adds argument 'resilience' to control the behaviour, when not using dispatcher, of whether to retry failed tasks on other daemons.
* `mirai()` adds logical argument '.signal' for whether to signal the condition variable within the compute profile upon resolution of the 'mirai'.
* `daemon()` argument 'exitlinger' retired as daemons now synchronise with the host/dispatcher and exit as soon as possible.
Expand Down
1 change: 1 addition & 0 deletions R/mirai-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ NULL
requires_n = "specifying 'url' without 'ssh' requires 'n'",
sync_dispatch = "initial sync with dispatcher timed out after 5s",
sync_timeout = "sync between host and dispatcher/daemon timed out after 5s",
tunnel_requires = "SSH tunnelling requires 'url' hostname to be 'localhost' or '127.0.0.1'",
url_spec = "numeric value for 'url' is out of bounds",
wrong_dots = "'...' arguments should only be of integer, numeric or logical type"
),
Expand Down
51 changes: 33 additions & 18 deletions R/parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@
#' remote nodes to dial into, including a port accepting incoming connections,
#' e.g. 'tcp://10.75.37.40:5555'. Specify a URL starting 'tls+tcp://' to use
#' secure TLS connections.
#' @param ssh (if 'url' is specified, for launching remote nodes via SSH) a named
#' list with 'nodes' being a character vector of hostnames or IP addresses
#' of the remote machines on which to launch nodes, e.g.
#' \code{c('10.75.37.90', 'nodename')}, and optionally 'port' as the numeric
#' port number on which to connect [default 22] and 'timeout' as the maximum
#' time allowed for connection setup in seconds [default 5].
#' @param ssh_nodes (if 'url' is specified, for launching remote nodes via SSH)
#' a character vector of hostnames or IP addresses of the remote machines on
#' which to launch nodes, e.g. \code{c('10.75.37.90', 'nodename')}.
#' @param ssh_port [default 22] numeric port number on which to connect.
#' @param ssh_timeout [default 5] maximum time allowed for connection setup in
#' seconds.
#' @param ssh_tunnel [default FALSE] logical value whether to use SSH reverse
#' tunnelling. If TRUE, a tunnel is created between the same local and
#' remote port specified as part of 'url'. 'url' in this case must be either
#' 'localhost' or '127.0.0.1'. This is as the host listens at 'url' on the
#' local machine, whilst the nodes dial into the same 'url' on the remote
#' machine, and the SSH tunnel connects both ends.
#' @param ... additional arguments passed onto \code{\link{daemons}}.
#'
#' @return For \strong{make_cluster}: An object of class 'miraiCluster' and
Expand All @@ -51,12 +57,14 @@
#' However, '...' arguments are passed onto \code{\link{daemons}} for
#' additional customisation if desired.
#'
#' For remote nodes, the 'ssh' argument is an optional convenience
#' For remote nodes, the 'ssh_' arguments are an optional convenience
#' feature. If used, the number of nodes is inferred from the length of the
#' character vector 'nodes' and 'n' is disregarded if supplied.
#' character vector 'ssh_nodes' and 'n' is disregarded if supplied. For ease
#' of use, SSH tunnelling assumes the same ports are available for forwarding
#' on both host and nodes, whilst this is not strictly necessary.
#'
#' Alternatively, by specifying 'url' and 'n', nodes may also be launched by
#' other means, for example using \code{\link{launch_remote}}.
#' other more customisable means, for example using \code{\link{launch_remote}}.
#'
#' @note Requires R >= 4.4 (currently R-devel). Clusters created with this
#' function will not work with prior R versions.
Expand Down Expand Up @@ -84,26 +92,33 @@
#'
#' @export
#'
make_cluster <- function(n, url = NULL, ssh = list(nodes = character(), port = 22, timeout = 5), ...) {
make_cluster <- function(n, url = NULL, ssh_nodes = character(), ssh_port = 22,
ssh_timeout = 5, ssh_tunnel = FALSE, ...) {

id <- sprintf("`%d`", length(..))

if (is.character(url)) {

nodes <- ssh[["nodes"]]

if (length(nodes)) {
port <- if (length(ssh[["port"]])) as.character(ssh[["port"]]) else "22"
timeout <- if (length(ssh[["timeout"]])) as.character(ssh[["timeout"]]) else "5"
if (length(ssh_nodes)) {
daemons(url = url, dispatcher = FALSE, resilience = FALSE, cleanup = 0L, ..., .compute = id)
for (node in nodes)
for (node in ssh_nodes)
launch_remote(
url = 1L,
command = "ssh",
args = c(sprintf("-o ConnectTimeout=%s -fTp", timeout), port, node, .),
args = c(
if (ssh_tunnel) {
purl <- parse_url(url)
purl[["hostname"]] %in% c("localhost", "127.0.0.1") || stop(.messages[["tunnel_requires"]])
sprintf("-R %s:%s", purl[["port"]], purl[["host"]])
},
sprintf("-o ConnectTimeout=%s -fTp", as.character(ssh_timeout)),
as.character(ssh_port),
node,
.
),
.compute = id
)
n <- length(nodes)
n <- length(ssh_nodes)

} else {
missing(n) && stop(.messages[["requires_n"]])
Expand Down
34 changes: 24 additions & 10 deletions man/make_cluster.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 5c37083

Please sign in to comment.