diff --git a/ci-storage b/ci-storage new file mode 100755 index 0000000..6ace751 --- /dev/null +++ b/ci-storage @@ -0,0 +1,506 @@ +#!/usr/bin/python3 -u +from __future__ import annotations + +import argparse +import re +import shlex +import subprocess +import sys +import textwrap +import typing +import time +import os.path + +STORAGE_MAX_AGE_SEC_DEFAULT = 3600 * 4 +STORAGE_MAX_AGE_SEC_BAK = 60 +STORAGE_DIR_DEFAULT = "ci-storage" + + +# +# Tool entry point. +# +def main(): + parser = argparse.ArgumentParser( + description="""\ + Quickly stores the content of some local directory in the storage + with the provided slot id on a remote host, or loads the content + from the storage to that directory. The tool makes some smart + differential optimizations along the way to perform it as fast as + possible (typically, 4 seconds for a 1.5G directory with 200K files + in it on Linux ext4). + + Under the hood, the tool uses rsync. When storing to the remote + storage, it uses rsync's "--link-dest" mode pointing to the most + recently created slot, to reuse as many existing files in the + storage as possible (hoping that almost all files to be stored in + the current slot are the same as the files in the recent slot, which + is often times true for e.g. node_modules directories). If a slot + with the same id already exists, it is overwritten in a + transaction-safe fashion. + + When loading the files from a remote storage slot to a local + directory, implies that the local directory already contains almost + all files equal to the remote ones, so rsync can run efficiently. + """, + formatter_class=ParagraphFormatter, + ) + parser.add_argument( + "action", + type=str, + choices=["store", "load"], + help="action to run", + ) + parser.add_argument( + "--storage-host", + type=str, + required=False, + help="storage host in the format [user@]host; it must have password-free SSH key access; if omitted, uses the local filesystem (no SSH)", + ) + parser.add_argument( + "--storage-dir", + type=str, + required=False, + help="storage directory path on the storage host; defaults to ~/ci-storage", + ) + parser.add_argument( + "--storage-max-age-sec", + type=str, + default=str(STORAGE_MAX_AGE_SEC_DEFAULT), + required=False, + help="remove slots created earlier than this many seconds ago", + ) + parser.add_argument( + "--slot-id", + type=str, + required=True, + help='id of the slot to store to or load from; use "*" to load a random most recent slot', + ) + parser.add_argument( + "--local-dir", + type=str, + required=True, + help="local directory path", + ) + parser.add_argument( + "--exclude", + type=str, + default=[], + action="append", + help="exclude pattern(s) for rsync", + ) + parser.add_argument( + "--verbose", + default=False, + action="store_true", + help="if set, prints the list of transferred files", + ) + + args = parser.parse_intermixed_args() + action: typing.Literal["store", "load"] = args.action + storage_host: str | None = args.storage_host or None + storage_dir: str = ( + (args.storage_dir and re.sub(r"/+$", "", args.storage_dir)) + or (storage_host and STORAGE_DIR_DEFAULT) + or os.path.expanduser(f"~/{STORAGE_DIR_DEFAULT}") + ) + storage_max_age_sec: int = int( + args.storage_max_age_sec or str(STORAGE_MAX_AGE_SEC_DEFAULT) + ) + slot_id: str = args.slot_id + local_dir: str = re.sub(r"/+$", "", args.local_dir) + exclude: list[str] = [ + line for line in "\n".join(args.exclude).splitlines() if line.strip() + ] + verbose: bool = args.verbose + + if action == "store": + action_store( + storage_host=storage_host, + storage_dir=storage_dir, + slot_id=slot_id, + local_dir=local_dir, + exclude=exclude, + verbose=verbose, + ) + action_maintenance( + storage_host=storage_host, + storage_dir=storage_dir, + max_age_sec=storage_max_age_sec, + ) + if action == "load": + action_load( + storage_host=storage_host, + storage_dir=storage_dir, + slot_id=slot_id, + local_dir=local_dir, + exclude=exclude, + verbose=verbose, + ) + + +# +# Stores the content of the local directory in the storage with the provided +# slot id on a remote host. +# +def action_store( + *, + storage_host: str | None, + storage_dir: str, + slot_id: str, + local_dir: str, + exclude: list[str], + verbose: bool, +): + if slot_id == "*": + raise UserException('slot_id="*" is not allowed for "store" action') + slot_id = normalize_slot_id(slot_id) + slot_ids_and_ages = list_slots(storage_host=storage_host, storage_dir=storage_dir) + slot_id_recent = slot_ids_and_ages[0][0] if len(slot_ids_and_ages) else None + slot_id_tmp = f"{slot_id}.tmp.{int(time.time())}" + check_call( + cmd=[ + "rsync", + "-a", + "--delete", + "--partial", + "--inplace", + "--stats", + *( + [f"--link-dest={storage_dir}/{slot_id_recent}/"] + if slot_id_recent + else [] + ), + *[f"--exclude={pattern}" for pattern in exclude], + *(["-v"] if verbose else []), + f"{local_dir}/", + (f"{storage_host}:" if storage_host else "") + + f"{storage_dir}/{slot_id_tmp}/", + ], + print_elapsed=True, + ) + print( + check_output_script( + host=storage_host, + script=SCRIPTS["COMMIT_SLOT"], + args=[storage_dir, slot_id_tmp, slot_id], + indent=True, + ), + end="", + ) + + +# +# Loads the content from the storage to the local directory. +# +def action_load( + *, + storage_host: str | None, + storage_dir: str, + slot_id: str, + local_dir: str, + exclude: list[str], + verbose: bool, +): + if slot_id == "*": + slot_ids_and_ages = list_slots( + storage_host=storage_host, storage_dir=storage_dir + ) + if len(slot_ids_and_ages) == 0: + raise UserException( + 'to use slot_id="*", there must be at least one slot in the storage.' + ) + slot_id = slot_ids_and_ages[0][0] + else: + slot_id = normalize_slot_id(slot_id) + check_call( + cmd=[ + "rsync", + "-a", + "--delete", + "--partial", + "--stats", + "--human-readable", + *[f"--exclude={pattern}" for pattern in exclude], + *(["-v"] if verbose else []), + (f"{storage_host}:" if storage_host else "") + f"{storage_dir}/{slot_id}/", + f"{local_dir}/", + ], + print_elapsed=True, + ) + + +# +# Runs the maintenance script for the storage. +# +def action_maintenance( + *, + storage_host: str | None, + storage_dir: str, + max_age_sec: int, +): + print( + check_output_script( + host=storage_host, + script=SCRIPTS["MAINTENANCE"], + args=[storage_dir, str(max_age_sec)], + indent=True, + ), + end="", + ) + + +# +# Returns the list of existing slot ids and their age in seconds, sorted by age. +# +def list_slots( + *, + storage_host: str | None, + storage_dir: str, +) -> list[tuple[str, int]]: + slot_ids_and_ages: list[tuple[str, int]] = [] + lines = check_output_script( + host=storage_host, script=SCRIPTS["LIST_SLOTS"], args=[storage_dir] + ) + for line in lines.splitlines(): + match = re.match(r"^([^.]+) (\d+)$", line) + if match: + slot_ids_and_ages.append((match.group(1), int(match.group(2)))) + slot_ids_and_ages.sort(key=lambda x: x[1]) + return slot_ids_and_ages + + +# +# Replaces all characters invalid in the file name with underscores. +# +def normalize_slot_id( + slot_id: str, +) -> str: + return re.sub(r"[^a-zA-Z0-9_-]", "_", slot_id) + + +# +# Runs an inline script and returns its output. +# +def check_output_script( + *, + host: str | None, + script: str, + args: list[str] = [], + indent: bool = False, +) -> str: + return check_output(host=host, cmd=["perl", "-we", script, *args], indent=indent) + + +# +# Runs a command and returns its output. +# +def check_output( + *, + host: str | None, + cmd: list[str], + indent: bool = False, +) -> str: + if host is not None: + cmd_prefix = [ + "ssh", + host, + ] + print(f"$ {cmd_to_debug_str([*cmd_prefix, *cmd])}") + cmd = [ + *cmd_prefix, + "-oStrictHostKeyChecking=no", + "-oUserKnownHostsFile=/dev/null", + shlex.join(cmd), + ] + else: + print(f"$ {cmd_to_debug_str(cmd)}") + output = subprocess.check_output(cmd, text=True, stderr=subprocess.PIPE) + return textwrap.indent(output, " ") if indent else output + + +# +# Runs a command and passes through its output. +# +def check_call( + *, + cmd: list[str], + print_elapsed: bool = False, +) -> None: + print(f"$ {cmd_to_debug_str(cmd)}") + start_time = time.time() + process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True + ) + while process.stdout: + line = process.stdout.readline() + if not line and process.poll() is not None: + break + if line.strip(): + print(f" {line}", end="") + if print_elapsed: + print(f" elapsed: {time.time() - start_time:.2f} sec") + + +# +# Converts a command to a debug string. +# +def cmd_to_debug_str( + cmd: list[str], +) -> str: + inv = dict((v, k) for k, v in SCRIPTS.items()) + return shlex.join([f"<{inv[arg]}>" if arg in inv else arg for arg in cmd]) + + +# +# A helper class for ArgumentParser. +# +class ParagraphFormatter(argparse.HelpFormatter): + def _fill_text(self, text: str, width: int, indent: str) -> str: + return "\n\n".join( + [ + textwrap.indent(textwrap.fill(paragraph, width), indent) + for paragraph in textwrap.dedent(text).split("\n\n") + ] + ) + + +# +# Custom user exceptions. +# +class UserException(Exception): + pass + + +# +# Inline scripts to run on the storage host. Reasons to use Perl: +# - It exists and is of the same version everywhere (as opposed to Python). +# - It is fast to boot and doesn't require external modules. +# - It has built-in flock() support. +# - If has native fork() support. +# +SCRIPTS = { + # The script to list existing slot ids and their age in seconds. It also + # pre-creates the storage directory if it doesn't exist. + "LIST_SLOTS": textwrap.dedent( + r""" + use strict; + my $storage_dir = $ARGV[0] or die("storage_dir argument required\n"); + length($storage_dir) >= 3 or die("storage_dir is suspiciously short\n"); + if (!-d $storage_dir) { + mkdir($storage_dir) or die("mkdir $storage_dir: $!\n"); + } + my @slot_dirs = glob("$storage_dir/*/"); + foreach my $dir (@slot_dirs) { + $dir =~ s{/+$}{}s; + my $inode_time = (stat($dir))[10] or die("stat $dir: $!\n"); + my $age_sec = time() - $inode_time; + my $slot_id = $dir; + $slot_id =~ s{^.*/}{}s; + print("$slot_id $age_sec\n"); + } + """.strip() + ), + # The script to rename the new slot directory to the destination one. + "COMMIT_SLOT": textwrap.dedent( + r""" + use strict; + my $storage_dir = $ARGV[0] or die("storage_dir argument required\n"); + my $slot_id_tmp = $ARGV[1] or die("slot_id_tmp argument required\n"); + my $slot_id_dst = $ARGV[2] or die("slot_id_dst argument required\n"); + length($storage_dir) >= 3 or die("storage_dir is suspiciously short\n"); + my $slot_dir_tmp = "$storage_dir/$slot_id_tmp"; + my $slot_dir_dst = "$storage_dir/$slot_id_dst"; + my $slot_dir_bak = "$storage_dir/$slot_id_dst.bak." . time(); + -d $slot_dir_bak and (system("rm", "-rf", $slot_dir_bak) == 0 or die("rm -rf $slot_dir_bak: $!\n")); + -d $slot_dir_dst and (system("mv", $slot_dir_dst, $slot_dir_bak) == 0 or die("mv $slot_dir_dst $slot_dir_bak: $!\n")); + system("mv", $slot_dir_tmp, $slot_dir_dst) == 0 or die("mv $slot_dir_tmp $slot_dir_dst: $!\n"); + """.strip() + ), + # This script is launched in background on the storage host to cleanup old or + # broken slots. + "MAINTENANCE": textwrap.dedent( + r""" + use strict; + use POSIX "setsid"; + use IPC::Open3; + *STDOUT->autoflush(1); + *STDERR->autoflush(1); + my $storage_dir = $ARGV[0] or die("storage_dir argument required\n"); + my $max_age_sec = $ARGV[1] or die("max_age_sec argument required\n"); + length($storage_dir) >= 3 or die("storage_dir is suspiciously short\n"); + my $lock_file = "$storage_dir/maintenance.lock"; + open(my $lock, ">>", $lock_file) or die("open $lock_file: $!\n"); + if (!flock($lock, 2 | 4)) { # LOCK_EX | LOCK_NB + print("another maintenance process is already running, so skipping\n"); + exit(0); + } + my @slot_infos = + sort { $a->[1] <=> $b->[1] } + map { + $_ =~ s{/+$}{}s; + my $inode_time = (stat($_))[10] or die("stat $_: $!\n"); + my $age_sec = time() - $inode_time; + [$_, $age_sec]; + } + glob("$storage_dir/*/"); + my $slot_dir_latest = (map { $_->[0] } grep { $_->[0] !~ /\./ } @slot_infos)[0]; + my @rm_dirs = (); + foreach my $info (@slot_infos) { + my ($dir, $age_sec) = @$info; + if (defined($slot_dir_latest) && $dir eq $slot_dir_latest) { + # Never delete the latest slot, even if it is old. + next; + } + if ($age_sec > $max_age_sec || $dir =~ /\.bak\.\d+$/s && $age_sec > %(STORAGE_MAX_AGE_SEC_BAK)d) { + push(@rm_dirs, $dir); + print("will remove $dir (age: $age_sec sec) in background\n"); + } + } + if (!@rm_dirs) { + unlink($lock_file); + exit(0); + } + # https://linux.die.net/man/1/perlipc + # We use open3, otherwise logger inherits our STDOUT/STDERR and doesn't let us close them. + # To test logger in MacOS: log stream --info --predicate 'process == "logger"' + open(my $devnull, ">", "/dev/null") or die("open /dev/null: $!\n"); + open(*STDIN, "<", "/dev/null") or die("open STDIN: $!\n"); + open3(*STDOUT, $devnull, $devnull, "logger", "-t", "ci-storage") or die("open3 STDOUT logger: $!\n"); + open(*STDERR, ">&", *STDOUT) or die("open STDERR: $!\n"); + *STDOUT->autoflush(1); + *STDERR->autoflush(1); + defined(my $pid = fork()) or die("fork: $!\n"); + $pid == 0 or exit(0); + POSIX::setsid() != -1 or die("setsid: $!\n"); + foreach my $dir (@rm_dirs) { + system("nice", "rm", "-rf", $dir) == 0 or die("rm -rf $dir: $!\n"); + print("removed $dir\n"); + } + unlink($lock_file); + """.strip() + % {"STORAGE_MAX_AGE_SEC_BAK": STORAGE_MAX_AGE_SEC_BAK} + ), +} + +# +# Script entry point. +# +if __name__ == "__main__": + try: + sys.exit(main()) + except KeyboardInterrupt: + sys.exit(1) + except UserException as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + except subprocess.CalledProcessError as e: + print( + textwrap.indent( + f"Command returned status {e.returncode}." + + (f"\n{e.stdout}" if e.stdout else "") + + (f"\n{e.stderr}" if e.stderr else ""), + " ", + ).rstrip(), + file=sys.stderr, + ) + sys.exit(2)