From 5e2f347b071e11937e945e94a0cbf8dc9636c976 Mon Sep 17 00:00:00 2001 From: Satya Ortiz-Gagne Date: Tue, 27 Feb 2024 09:57:30 -0500 Subject: [PATCH] Add covalent aws ec2 launcher and push_reports covalent is not compatible with milabench as it requires sqlalchemy<2.0.0 --- benchmarks/_template/requirements.cpu.txt | 46 +++++ config/test-system.yaml | 16 ++ config/test.yaml | 24 +++ milabench/__init__.py | 5 + milabench/cli/__init__.py | 5 + milabench/cli/badges/__main__.py | 51 ++++++ milabench/cli/badges/requirements.txt | 1 + milabench/cli/covalent/__main__.py | 202 ++++++++++++++++++++++ milabench/cli/covalent/requirements.txt | 2 + milabench/cli/purge_cloud.py | 51 ++++++ milabench/cli/run.py | 21 ++- milabench/commands/__init__.py | 28 ++- milabench/common.py | 87 +++++++++- milabench/config.py | 14 ++ milabench/multi.py | 156 ++++++++++++++++- milabench/remote.py | 30 +++- 16 files changed, 714 insertions(+), 25 deletions(-) create mode 100644 benchmarks/_template/requirements.cpu.txt create mode 100644 config/test-system.yaml create mode 100644 config/test.yaml create mode 100644 milabench/cli/badges/__main__.py create mode 100644 milabench/cli/badges/requirements.txt create mode 100644 milabench/cli/covalent/__main__.py create mode 100644 milabench/cli/covalent/requirements.txt create mode 100644 milabench/cli/purge_cloud.py diff --git a/benchmarks/_template/requirements.cpu.txt b/benchmarks/_template/requirements.cpu.txt new file mode 100644 index 000000000..e0058b822 --- /dev/null +++ b/benchmarks/_template/requirements.cpu.txt @@ -0,0 +1,46 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile --output-file=benchmarks/_template/requirements.cpu.txt benchmarks/_template/requirements.in +# +antlr4-python3-runtime==4.9.3 + # via omegaconf +asttokens==2.4.1 + # via giving +codefind==0.1.3 + # via ptera +executing==1.2.0 + # via varname +giving==0.4.2 + # via + # ptera + # voir +markdown-it-py==3.0.0 + # via rich +mdurl==0.1.2 + # via markdown-it-py +omegaconf==2.3.0 + # via voir +ovld==0.3.2 + # via voir +ptera==1.4.1 + # via voir +pygments==2.17.2 + # via rich +pynvml==11.5.0 + # via voir +pyyaml==6.0.1 + # via omegaconf +reactivex==4.0.4 + # via giving +rich==13.7.0 + # via voir +six==1.16.0 + # via asttokens +typing-extensions==4.10.0 + # via reactivex +varname==0.10.0 + # via giving +voir==0.2.12 + # via -r benchmarks/_template/requirements.in diff --git a/config/test-system.yaml b/config/test-system.yaml new file mode 100644 index 000000000..d2da3bff7 --- /dev/null +++ b/config/test-system.yaml @@ -0,0 +1,16 @@ +system: + # Nodes list + nodes: + - name: manager + ip: 1.1.1.1 + main: true + user: user + + # Cloud instances profiles + cloud_profiles: + ec2: + profile: mb_test_sog_2 + username: ubuntu + instance_type: t2.micro + volume_size: 8 + region: us-east-2 diff --git a/config/test.yaml b/config/test.yaml new file mode 100644 index 000000000..f4a6f7bf4 --- /dev/null +++ b/config/test.yaml @@ -0,0 +1,24 @@ +_defaults: + max_duration: 600 + voir: + options: + stop: 60 + interval: "1s" + +test: + inherits: _defaults + group: test_remote + install_group: test_remote + definition: ../benchmarks/_template + plan: + method: per_gpu + run_on: ec2 + +testing: + inherits: _defaults + definition: ../benchmarks/_template + group: test_remote_2 + install_group: test_remote_2 + plan: + method: per_gpu + run_on: ec2 diff --git a/milabench/__init__.py b/milabench/__init__.py index e69de29bb..ac33e6bb3 100644 --- a/milabench/__init__.py +++ b/milabench/__init__.py @@ -0,0 +1,5 @@ +import pathlib + +ROOT_FOLDER = pathlib.Path(__file__).resolve().parent.parent +CONFIG_FOLDER = ROOT_FOLDER / "config" +BENCHMARK_FOLDER = ROOT_FOLDER / "benchmarks" diff --git a/milabench/cli/__init__.py b/milabench/cli/__init__.py index f0eea8d1e..908664470 100644 --- a/milabench/cli/__init__.py +++ b/milabench/cli/__init__.py @@ -12,6 +12,7 @@ from .pr import cli_write_report_to_pr from .prepare import cli_prepare from .publish import cli_publish +from .purge_cloud import cli_purge_cloud from .report import cli_report from .run import cli_run from .schedule import cli_schedule @@ -37,6 +38,10 @@ def pin(): """Pin the benchmarks' dependencies.""" cli_pin() + def purge_cloud(): + """Purge running cloud instannces the benchmarks' dependencies.""" + cli_purge_cloud() + def dev(): """Create a shell in a benchmark's environment for development.""" cli_dev() diff --git a/milabench/cli/badges/__main__.py b/milabench/cli/badges/__main__.py new file mode 100644 index 000000000..43b19b16c --- /dev/null +++ b/milabench/cli/badges/__main__.py @@ -0,0 +1,51 @@ +import pathlib +import subprocess +import sys + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + try: + import pybadges + except ImportError: + cache_dir = pathlib.Path(f"/tmp/milabench/{pathlib.Path(__file__).name}_venv") + python3 = str(cache_dir / "bin/python3") + try: + subprocess.run([ + python3, + "-c", + "import pybadges" + ], check=True) + except (FileNotFoundError, subprocess.CalledProcessError): + cache_dir.mkdir(parents=True, exist_ok=True) + subprocess.run([sys.executable, "-m", "virtualenv", str(cache_dir)], check=True) + subprocess.run([python3, "-m", "pip", "install", "-U", "pip"], check=True) + subprocess.run([ + python3, + "-m", + "pip", + "install", + "-r", + str(pathlib.Path(__file__).resolve().parent / "requirements.txt") + ], check=True) + subprocess.run([ + python3, + "-c", + "import pybadges" + ], check=True) + return subprocess.call( + [python3, __file__, *argv], + ) + + return subprocess.run([ + sys.executable, + "-m", + "pybadges", + *argv + ], check=True).returncode + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/milabench/cli/badges/requirements.txt b/milabench/cli/badges/requirements.txt new file mode 100644 index 000000000..26620981a --- /dev/null +++ b/milabench/cli/badges/requirements.txt @@ -0,0 +1 @@ +pybadges \ No newline at end of file diff --git a/milabench/cli/covalent/__main__.py b/milabench/cli/covalent/__main__.py new file mode 100644 index 000000000..6a058aeed --- /dev/null +++ b/milabench/cli/covalent/__main__.py @@ -0,0 +1,202 @@ +import argparse +import asyncio +import os +import pathlib +import subprocess +import sys +import tempfile + + +def main(argv=None): + if argv is None: + argv = sys.argv[1:] + + try: + import covalent as ct + from covalent._shared_files.util_classes import RESULT_STATUS + from covalent_ec2_plugin import ec2 + except ImportError: + cache_dir = pathlib.Path(f"/tmp/milabench/{pathlib.Path(__file__).name}_venv") + python3 = str(cache_dir / "bin/python3") + try: + subprocess.run([ + python3, + "-c", + "import covalent ; from covalent.executor import EC2Executor" + ], check=True) + except (FileNotFoundError, subprocess.CalledProcessError): + cache_dir.mkdir(parents=True, exist_ok=True) + subprocess.run([sys.executable, "-m", "virtualenv", str(cache_dir)], check=True) + subprocess.run([python3, "-m", "pip", "install", "-U", "pip"], check=True) + subprocess.run([ + python3, + "-m", + "pip", + "install", + "-r", + str(pathlib.Path(__file__).resolve().parent / "requirements.txt") + ], check=True) + subprocess.run([ + python3, + "-c", + "import covalent ; from covalent.executor import EC2Executor" + ], check=True) + return subprocess.call( + [python3, __file__, *argv], + ) + + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers() + for p in ("ec2",): + try: + if p == "ec2": + from covalent_ec2_plugin import ec2 + subparser = subparsers.add_parser(p) + subparser.add_argument(f"--setup", action="store_true") + subparser.add_argument(f"--teardown", action="store_true") + for param, default in ec2._EXECUTOR_PLUGIN_DEFAULTS.items(): + subparser.add_argument(f"--{param.replace('_', '-')}", default=default) + except ImportError: + continue + + try: + cv_argv, argv = argv[:argv.index("--")], argv[argv.index("--")+1:] + except ValueError: + cv_argv, argv = argv, [] + + args = parser.parse_args(cv_argv) + + def _popen(cmd, *args, _env=None, **kwargs): + _debug_f = open("/home/ubuntu/debug", "wt") + print(cmd, *args, _env, kwargs, sep="\n", file=_debug_f, flush=True) + _env = _env if _env is not None else {} + + for envvar in _env.keys(): + envvar_val = _env[envvar] + + if not envvar_val: + continue + + envvar_val = pathlib.Path(envvar_val).expanduser() + if str(envvar_val) != _env[envvar]: + _env[envvar] = str(envvar_val) + + if "MILABENCH_CONFIG_CONTENT" in _env: + _config_dir = pathlib.Path(_env["MILABENCH_CONFIG"]).parent + with tempfile.NamedTemporaryFile("wt", dir=str(_config_dir), suffix=".yaml", delete=False) as _f: + _f.write(_env["MILABENCH_CONFIG_CONTENT"]) + _env["MILABENCH_CONFIG"] = _f.name + + try: + cmd = (str(pathlib.Path(cmd[0]).expanduser()), *cmd[1:]) + except IndexError: + pass + + cwd = kwargs.pop("cwd", None) + if cwd is not None: + cwd = str(pathlib.Path(cwd).expanduser()) + + _env = {**os.environ.copy(), **kwargs.pop("env", {}), **_env} + + kwargs = { + **kwargs, + "cwd": cwd, + "env": _env, + "stdout": subprocess.PIPE, + "stderr": subprocess.PIPE, + } + print(cmd, *args, _env, kwargs, sep="\n", file=_debug_f, flush=True) + p = subprocess.Popen(cmd, *args, **kwargs) + + stdout_chunks = [] + while True: + line = p.stdout.readline() + if not line: + break + line_str = line.decode("utf-8").strip() + stdout_chunks.append(line_str) + print(line_str) + + _, stderr = p.communicate() + stderr = stderr.decode("utf-8").strip() + stdout = os.linesep.join(stdout_chunks) + + if p.returncode != 0: + raise subprocess.CalledProcessError( + p.returncode, + (cmd, args, kwargs), + stdout, + stderr + ) + return p.returncode, os.linesep.join([stdout, stderr]) + + executor = None + if cv_argv[0] == "ec2": + executor:ct.executor.BaseExecutor = ct.executor.EC2Executor( + **{k:v for k,v in vars(args).items() if k not in ("setup", "teardown")} + ) + + @ct.lattice + def lattice(argv=()): + venv = pathlib.Path("~/venv") + code = pathlib.Path("~/milabench") + _env = { + "MILABENCH_BASE": "~/benches", + } + env_config = os.environ.get("MILABENCH_CONFIG", None) + + if env_config is not None: + env_config = pathlib.Path(env_config) + _env["MILABENCH_CONFIG"] = f"{code / (env_config.relative_to(env_config.parent.parent))}" + _env["MILABENCH_CONFIG_CONTENT"] = env_config.read_text() + + return ct.electron( + _popen, + executor=executor, + deps_bash=ct.DepsBash([ + f"[ -d {code} ] || git clone https://github.com/mila-iqia/milabench.git {code}", + f"git -C {code} checkout -B stable origin/stable", + f"python3 -m virtualenv {venv}", + f"{venv}/bin/python3 -m pip install -U pip", + f"{venv}/bin/python3 -m pip install -U -e {code}", + ]), + )( + [f"{venv}/bin/python3", "-m", "milabench", *argv], + cwd=str(code), + _env=_env, + ) + + return_code = 0 + try: + dispatch_id = None + result = None + if argv: + dispatch_id = ct.dispatch(lattice, disable_run=False)(argv) + result = ct.get_result(dispatch_id=dispatch_id, wait=True) + return_code = result.result[0] if result.result is not None else 1 + elif args.setup: + asyncio.run(executor.setup({})) + print(f"hostname::>{executor.hostname}") + print(f"username::>{executor.username}") + print(f"ssh_key_file::>{executor.ssh_key_file}") + print(f"env::>~/.condaenvrc") + finally: + result = ct.get_result(dispatch_id=dispatch_id, wait=False) if dispatch_id else None + status = result.get_node_result(0)["status"] if result else RESULT_STATUS.COMPLETED + results_dir = result.results_dir if result else None + if args.teardown or status not in ( + RESULT_STATUS.CANCELLED, + RESULT_STATUS.COMPLETED, + RESULT_STATUS.FAILED, + ): + asyncio.run( + executor.teardown( + {"dispatch_id": dispatch_id, "node_id": 0, "results_dir": results_dir} + ) + ) + + return return_code + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/milabench/cli/covalent/requirements.txt b/milabench/cli/covalent/requirements.txt new file mode 100644 index 000000000..f810e6eaf --- /dev/null +++ b/milabench/cli/covalent/requirements.txt @@ -0,0 +1,2 @@ +covalent +covalent-ec2-plugin @ git+https://github.com/satyaog/covalent-ec2-plugin.git@feature/milabench \ No newline at end of file diff --git a/milabench/cli/purge_cloud.py b/milabench/cli/purge_cloud.py new file mode 100644 index 000000000..d6a380d7b --- /dev/null +++ b/milabench/cli/purge_cloud.py @@ -0,0 +1,51 @@ +from dataclasses import dataclass + +from coleo import Option, tooled + +from milabench.utils import validation_layers + +from .install import arguments +from ..common import get_multipack, run_with_loggers +from ..log import DataReporter, TerminalFormatter, TextReporter + + +# fmt: off +@dataclass +class Arguments: + shorttrace: bool = False + variant: str = None +# fmt: on + + +@tooled +def arguments(): + # On error show full stacktrace + shorttrace: Option & bool = False + + # Install variant + variant: Option & str = None + + return Arguments(shorttrace, variant) + + +@tooled +def cli_purge_cloud(args=None): + """Purge running cloud instannces the benchmarks' dependencies.""" + if args is None: + args = arguments() + + overrides = {"*": {"install_variant": args.variant}} if args.variant else {} + + mp = get_multipack(overrides=overrides) + + return run_with_loggers( + mp.do_purge_cloud(), + loggers=[ + TerminalFormatter(), + TextReporter("stdout"), + TextReporter("stderr"), + DataReporter(), + *validation_layers("error", short=args.shorttrace), + ], + mp=mp, + ) diff --git a/milabench/cli/run.py b/milabench/cli/run.py index 24ac92cb3..df4b22779 100644 --- a/milabench/cli/run.py +++ b/milabench/cli/run.py @@ -8,6 +8,7 @@ from ..common import ( Option, _error_report, + _push_reports, _read_reports, get_multipack, init_arch, @@ -102,18 +103,20 @@ def cli_run(args=None): mp=mp, ) - if args.report: - runs = {pack.logdir for pack in mp.packs.values()} - compare = None - compare_gpus = False - html = None - price = None + runs = {pack.logdir for pack in mp.packs.values()} + if runs: + reports = _read_reports(*runs) + if len(reports): + _push_reports(mp.packs) - reports = None - if runs: - reports = _read_reports(*runs) + if args.report: assert len(reports) != 0, "No reports found" + compare = None + compare_gpus = False + html = None + price = None + summary = make_summary(reports.values()) assert len(summary) != 0, "No summaries" diff --git a/milabench/commands/__init__.py b/milabench/commands/__init__.py index 30ce3ffa8..9ba4637f5 100644 --- a/milabench/commands/__init__.py +++ b/milabench/commands/__init__.py @@ -399,6 +399,11 @@ def is_local(self): == localnode["hostname"] # The hostname is the local node ) + def _load_env(self, node): + if node.get("env", None): + return [".", node["env"], ";"] + return [] + def _argv(self, **kwargs) -> List: # No-op when executing on a local node if self.is_local(): @@ -410,13 +415,14 @@ def _argv(self, **kwargs) -> List: host = f"{user}@{self.host}" if user else self.host argv = super()._argv(**kwargs) - argv.extend(["-oPasswordAuthentication=no"]) - argv.extend(["-p", str(self.port)]) - if key: - argv.append(f"-i{key}") + # scp apparently needs `-i` to be first + argv.insert(1, f"-i{key}") + argv.append(f"-p{self.port}") argv.append(host) + argv.extend(self._load_env(node)) + return argv @@ -427,21 +433,27 @@ def __init__( self, pack: pack.BasePackage, host: str, - directory: str, + src: str, *scp_argv, + dest: str = None, user: str = None, key: str = None, **kwargs, ) -> None: super().__init__(pack, host, "-r", *scp_argv, user=user, key=key, **kwargs) - self.dir = directory + self.src = src + self.dest = dest if dest is not None else self.src + + def _load_env(self, node): + del node + return [] def _argv(self, **kwargs) -> List: argv = super()._argv(**kwargs) host = argv.pop() - argv.append(self.dir) - argv.append(f"{host}:{self.dir}") + argv.append(self.src) + argv.append(f"{host}:{self.dest}") return argv diff --git a/milabench/common.py b/milabench/common.py index 35f9cf125..b82f149b0 100644 --- a/milabench/common.py +++ b/milabench/common.py @@ -1,18 +1,23 @@ +from copy import deepcopy import io import json import os import re import runpy +import subprocess import sys import traceback from dataclasses import dataclass, field from datetime import datetime from coleo import Option, default, tooled +import git from omegaconf import OmegaConf from voir.instruments.gpu import deduce_backend, select_backend +from milabench import ROOT_FOLDER from milabench.alt_async import proceed +from milabench.metadata import machine_metadata from milabench.utils import available_layers, blabla, multilogger from .config import build_config, build_system_config @@ -194,6 +199,13 @@ def _get_multipack( if args.config is None: sys.exit("Error: CONFIG argument not provided and no $MILABENCH_CONFIG") + if args.system is None: + args.system = os.environ.get("MILABENCH_CONFIG_SYSTEM", None) + + if args.system is None: + if XPath(f"{args.config}.system").exists(): + args.system = f"{args.config}.system" + if args.select: args.select = set(args.select.split(",")) @@ -255,7 +267,7 @@ def is_selected(defn): return selected_config else: return MultiPackage( - {name: get_pack(defn) for name, defn in selected_config.items()} + {name: get_pack(deepcopy(defn)) for name, defn in selected_config.items()} ) @@ -296,6 +308,79 @@ def _read_reports(*runs): return all_data +def _push_reports(packs:dict): + _SVG_COLORS = { + "pass": "blue", + "partial": "yellow", + "failure": "red", + } + import milabench.cli.badges as badges + + reports_repo = XPath(next(iter(packs.values())).config["dirs"]["base"]) / "reports" + _repo = git.repo.base.Repo(ROOT_FOLDER) + try: + reports_repo = git.repo.base.Repo(str(reports_repo)) + except (git.exc.InvalidGitRepositoryError, git.exc.NoSuchPathError): + repo_url = next(iter(_r.url for _r in _repo.remotes if _r.name == "origin"), None) + reports_repo = git.repo.base.Repo.clone_from(repo_url, str(reports_repo), branch="reports") + reports_url = XPath(":".join(reports_repo.remote("origin").url.split(":")[1:])) + reports_url = XPath("https://github.com") / f"{reports_url.with_suffix('')}/tree/{reports_repo.active_branch.name}" + tags = {t.commit.hexsha:t for t in _repo.tags} + latest_tag = next(iter( + tags[c.hexsha] + for c in _repo.iter_commits() + if c.hexsha in tags + and tags[c.hexsha].name.startswith("v") + )) + + reports_dir = XPath(reports_repo.working_tree_dir) / latest_tag.name + gpu_reports = {} + for pack in packs.values(): + meta = machine_metadata(pack) + gpu = next(iter(meta["accelerators"]["gpus"].values()))["product"].replace(" ", "_") + pack.logdir.copy(reports_dir / gpu / pack.logdir.name) + + gpu_reports[gpu] = set((reports_dir / gpu).glob("*/")) + + for gpu, _reports in gpu_reports.items(): + return_codes = [ + _r[-1]["data"]["return_code"] + for _r in _read_reports(*_reports).values() + ] + if all(return_codes): + text = "failure" + elif any(return_codes): + text = "partial" + elif sum(return_codes) == 0: + text = "pass" + + result = subprocess.run( + [ + sys.executable, + "-m", badges.__name__, + "--left-text", gpu, + "--right-text", text, + "--right-color", _SVG_COLORS[text], + "--whole-link", str(reports_url / latest_tag.name / gpu) + ], + capture_output=True + ) + if result.returncode == 0: + (reports_dir / gpu / "badge.svg").write_text(result.stdout.decode("utf8")) + + for cmd, _kwargs in ( + (["git", "pull"], {"check": True}), + (["git", "add", latest_tag.name], {"check": True}), + (["git", "commit", "-m", latest_tag.name], {"check": False}), + (["git", "push"], {"check": True}) + ): + subprocess.run( + cmd, + cwd=reports_repo.working_tree_dir, + **_kwargs + ) + + def _error_report(reports): out = {} for r, data in reports.items(): diff --git a/milabench/config.py b/milabench/config.py index bfee806e7..e276cb17c 100644 --- a/milabench/config.py +++ b/milabench/config.py @@ -1,6 +1,8 @@ import contextvars +import hashlib import os import socket +from copy import deepcopy import psutil import yaml @@ -57,6 +59,16 @@ def resolve_inheritance(bench_config, all_configs): return bench_config +def compute_config_hash(config): + config = deepcopy(config) + for entry in config: + config[entry]["dirs"] = {} + config[entry]["config_base"] = "" + config[entry]["config_file"] = "" + config[entry]["run_name"] = "" + return hashlib.md5(str(config).encode("utf8")).hexdigest() + + def finalize_config(name, bench_config): bench_config["name"] = name if "definition" in bench_config: @@ -76,6 +88,8 @@ def build_config(*config_files): for layer in _config_layers(config_files): all_configs = merge(all_configs, layer) + all_configs["*"]["hash"] = compute_config_hash(all_configs) + for name, bench_config in all_configs.items(): all_configs[name] = resolve_inheritance(bench_config, all_configs) diff --git a/milabench/multi.py b/milabench/multi.py index 9946a3642..a2e936b53 100644 --- a/milabench/multi.py +++ b/milabench/multi.py @@ -1,12 +1,18 @@ import asyncio +import os +import subprocess +import sys import traceback from collections import defaultdict from copy import deepcopy from voir.instruments.gpu import get_gpu_info +import yaml + +import milabench as mb from .capability import is_system_capable -from .commands import NJobs, PerGPU +from .commands import CmdCommand, NJobs, PerGPU, SCPCommand, SSHCommand, SequenceCommand from .fs import XPath from .pack import Package from .remote import ( @@ -84,10 +90,138 @@ def setup_pack(self) -> Package: "dirs": pack.config["dirs"], "config_base": pack.config["config_base"], "config_file": pack.config["config_file"], + "plan": pack.config["plan"], "system": pack.config["system"], + "hash": pack.config["hash"], + "install_variant": pack.config["install_variant"], } ) + def manage_cloud(self, pack, action="setup"): + if pack.config.get("plan", {}).get("run_on", None) is not None: + run_on = pack.config["plan"]["run_on"] + key_map = { + "hostname":"ip", + "username":"user", + "ssh_key_file":"key", + "env":"env", + } + plan_params = deepcopy(pack.config["system"]["cloud_profiles"][run_on]) + + nodes = iter(enumerate(pack.config["system"]["nodes"])) + + state_prefix = [] + for p in self.packs.values(): + state_prefix.append(p.config["name"]) + state_prefix.append(p.config["install_variant"]) + + while True: + try: + i, n = next(nodes) + except StopIteration: + break + + plan_params["state_prefix"] = "-".join([str(i), *state_prefix]) + plan_params["state_id"] = pack.config["hash"] + + import milabench.cli.covalent as cv + cmd = [ + sys.executable, + "-m", cv.__name__, + run_on, + f"--{action}", + *[ + f"--{k.replace('_', '-')}={v}" + for k, v in plan_params.items() + ], + ] + p = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + stdout_chunks = [] + while True: + line = p.stdout.readline() + if not line: + break + line_str = line.decode("utf-8").strip() + stdout_chunks.append(line_str) + print(line_str) + + if not line_str: + continue + try: + k, v = line_str.split("::>") + if k == "hostname" and n[key_map[k]] != "1.1.1.1": + i, n = next(nodes) + n[key_map[k]] = v + except ValueError: + pass + + _, stderr = p.communicate() + stderr = stderr.decode("utf-8").strip() + print(stderr, file=sys.stderr) + + if p.returncode != 0: + stdout = os.linesep.join(stdout_chunks) + raise subprocess.CalledProcessError( + p.returncode, + cmd, + stdout, + stderr + ) + + if action != "setup": + return + + # Resolve ip addresses + pack.config["system"] = { + **pack.config["system"], + **mb.config.build_system_config( + None, + defaults=pack.config + )["system"] + } + + config = {} + config_hash = pack.config["hash"] + config_file = XPath(pack.config["config_file"]) + config_file = config_file.with_name(f"{config_file.name}.{config_hash}") + pack.config["config_file"] = str(config_file) + for p in self.packs.values(): + config[p.config["name"]] = p.config + p.config["config_file"] = str(config_file) + config_file.write_text(yaml.dump(config)) + + for n in pack.config["system"]["nodes"]: + _cmds = [ + SSHCommand( + CmdCommand( + pack, + "(", "mkdir", "-p", str(mb.ROOT_FOLDER.parent), pack.config["dirs"]["base"], ")", + "||", "(", "sudo", "mkdir", "-p", str(mb.ROOT_FOLDER.parent), pack.config["dirs"]["base"], + "&&", "sudo", "chmod", "-R", "a+rwX", str(mb.ROOT_FOLDER.parent), pack.config["dirs"]["base"], ")", + ), + n["ip"], + ), + SSHCommand( + CmdCommand( + pack, + "mkdir", "-p", str(config_file.parent), + ), + n["ip"], + ), + SCPCommand( + pack, + n["ip"], + str(config_file), + ), + ] + + yield SequenceCommand(*_cmds) + async def do_phase(self, phase_name, remote_task, method): """Run a phase on all the nodes""" @@ -121,6 +255,10 @@ async def do_install(self): remote_task = None if is_remote(setup): + await asyncio.wait( + [asyncio.create_task(t.execute()) for t in self.manage_cloud(setup, action="setup")] + ) + # We are outside system, setup the main node first remote_plan = milabench_remote_install(setup, setup_for="main") remote_task = asyncio.create_task(remote_plan.execute()) @@ -142,6 +280,10 @@ async def do_prepare(self): remote_task = None if is_remote(setup): + await asyncio.wait( + [asyncio.create_task(t.execute()) for t in self.manage_cloud(setup, action="setup")] + ) + remote_plan = milabench_remote_prepare(setup, run_for="main") remote_task = asyncio.create_task(remote_plan.execute()) await asyncio.wait([remote_task]) @@ -158,6 +300,10 @@ async def do_run(self, repeat=1): setup = self.setup_pack() if is_remote(setup): + await asyncio.wait( + [asyncio.create_task(t.execute()) for t in self.manage_cloud(setup, action="setup")] + ) + # if we are not on the main node right now # ssh to the main node and launch milabench remote_plan = milabench_remote_run(setup) @@ -248,3 +394,11 @@ async def do_pin( pip_compile_args=pip_compile_args, constraints=new_constraints, ) + + async def do_purge_cloud(self): + setup = self.setup_pack() + + if is_remote(setup): + await asyncio.wait( + [asyncio.create_task(t.execute()) for t in self.manage_cloud(setup, action="teardown")] + ) diff --git a/milabench/remote.py b/milabench/remote.py index bf5963183..5abc8efa5 100644 --- a/milabench/remote.py +++ b/milabench/remote.py @@ -1,6 +1,7 @@ import os import sys +from . import ROOT_FOLDER from .commands import ( CmdCommand, Command, @@ -10,7 +11,7 @@ VoidCommand, ) -INSTALL_FOLDER = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +INSTALL_FOLDER = str(ROOT_FOLDER) def scp(node, folder, dest=None) -> list: @@ -34,15 +35,21 @@ def rsync(node, folder, dest=None) -> list: """Copy a folder from local node to remote node""" host = node["ip"] user = node["user"] + key = node.get("key", None) + key = f"-i{key}" if key else "" if dest is None: dest = os.path.abspath(os.path.join(folder, "..")) return [ "rsync", + "--force", "-av", "-e", - "ssh -oCheckHostIP=no -oStrictHostKeyChecking=no", + f"ssh {key} -oCheckHostIP=no -oStrictHostKeyChecking=no", + "--include=*/.git/*", + *[f"--exclude=*/{_dir}/*" + for _dir in (".*", "venv", "env", "tmp")], folder, f"{user}@{host}:{dest}", ] @@ -84,18 +91,23 @@ def milabench_remote_setup_plan(pack, setup_for="worker") -> SequenceCommand: """ nodes = pack.config["system"]["nodes"] - copy = [] node_packs = [] + clone = [] for node in nodes: node_pack = None if should_run_for(node, setup_for): node_pack = worker_pack(pack, node) - copy.append(CmdCommand(node_pack, *rsync(node, INSTALL_FOLDER))) + clone.append(CmdCommand(node_pack, *rsync(node, INSTALL_FOLDER))) node_packs.append(node_pack) + copy = [] + for node in nodes: + if should_run_for(node, setup_for): + copy.append(CmdCommand(node_pack, *rsync(node, INSTALL_FOLDER))) + install = [] for i, node in enumerate(nodes): if should_run_for(node, setup_for): @@ -131,7 +143,13 @@ def milabench_remote_command(pack, *command, run_for="worker") -> ListCommand: cmds.append( SSHCommand( - CmdCommand(worker_pack(pack, worker), "milabench", *command), + CmdCommand( + worker_pack(pack, worker), + "cd", f"{INSTALL_FOLDER}", "&&", + f"MILABENCH_CONFIG={pack.config['config_file']}", + f"MILABENCH_BASE={os.environ.get('MILABENCH_BASE', '')}", + "milabench", *command + ), host=host, user=user, key=key, @@ -183,9 +201,9 @@ def milabench_remote_install(pack, setup_for="worker") -> SequenceCommand: return VoidCommand(pack) argv = sys.argv[2:] - return SequenceCommand( milabench_remote_setup_plan(pack, setup_for), + milabench_remote_command(pack, "pin", *argv, run_for=setup_for), milabench_remote_command(pack, "install", *argv, run_for=setup_for), )