From b8e22dacaa6532a970f0d717bc9ab57fa616e732 Mon Sep 17 00:00:00 2001 From: haiqi96 <14502009+haiqi96@users.noreply.github.com> Date: Wed, 27 Nov 2024 11:54:47 -0500 Subject: [PATCH] feat(clp-package): Add support for deleting archives that are exclusively within a time range. (#594) Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com> --- .../clp_package_utils/__init__.py | 13 ++ .../clp_package_utils/general.py | 2 +- .../clp_package_utils/scripts/compress.py | 10 +- .../clp_package_utils/scripts/decompress.py | 14 +- .../clp_package_utils/scripts/del_archives.py | 103 +++++++++++++ .../scripts/native/compress.py | 8 - .../scripts/native/decompress.py | 8 - .../scripts/native/del_archives.py | 139 ++++++++++++++++++ .../scripts/native/search.py | 8 - .../clp_package_utils/scripts/search.py | 10 +- .../clp_package_utils/scripts/start_clp.py | 10 +- .../clp_package_utils/scripts/stop_clp.py | 10 +- .../src/sbin/admin-tools/del-archives.sh | 9 ++ 13 files changed, 272 insertions(+), 72 deletions(-) create mode 100644 components/clp-package-utils/clp_package_utils/scripts/del_archives.py create mode 100644 components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py create mode 100755 components/package-template/src/sbin/admin-tools/del-archives.sh diff --git a/components/clp-package-utils/clp_package_utils/__init__.py b/components/clp-package-utils/clp_package_utils/__init__.py index e69de29bb..5253a87e5 100644 --- a/components/clp-package-utils/clp_package_utils/__init__.py +++ b/components/clp-package-utils/clp_package_utils/__init__.py @@ -0,0 +1,13 @@ +import logging + +# Set up console logging +logging_console_handler = logging.StreamHandler() +logging_formatter = logging.Formatter( + "%(asctime)s.%(msecs)03d %(levelname)s [%(module)s] %(message)s", datefmt="%Y-%m-%dT%H:%M:%S" +) +logging_console_handler.setFormatter(logging_formatter) + +# Set up root logger +root_logger = logging.getLogger() +root_logger.setLevel(logging.INFO) +root_logger.addHandler(logging_console_handler) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index f42542ebc..5fae8166f 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -107,7 +107,7 @@ def get_clp_home(): return clp_home.resolve() -def generate_container_name(job_type: JobType) -> str: +def generate_container_name(job_type: str) -> str: """ :param job_type: :return: A unique container name for the given job type. diff --git a/components/clp-package-utils/clp_package_utils/scripts/compress.py b/components/clp-package-utils/clp_package_utils/scripts/compress.py index d0aa30913..efd3180ae 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/compress.py @@ -18,15 +18,7 @@ validate_and_load_db_credentials_file, ) -# Setup logging -# Create logger logger = logging.getLogger(__file__) -logger.setLevel(logging.INFO) -# Setup console logging -logging_console_handler = logging.StreamHandler() -logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s") -logging_console_handler.setFormatter(logging_formatter) -logger.addHandler(logging_console_handler) def main(argv): @@ -66,7 +58,7 @@ def main(argv): logger.exception("Failed to load config.") return -1 - container_name = generate_container_name(JobType.COMPRESSION) + container_name = generate_container_name(str(JobType.COMPRESSION)) container_clp_config, mounts = generate_container_config(clp_config, clp_home) generated_config_path_on_container, generated_config_path_on_host = dump_container_config( diff --git a/components/clp-package-utils/clp_package_utils/scripts/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/decompress.py index 9085fb162..325f2add6 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/decompress.py @@ -25,15 +25,7 @@ validate_path_could_be_dir, ) -# Setup logging -# Create logger -logger = logging.getLogger("clp") -logger.setLevel(logging.DEBUG) -# Setup console logging -logging_console_handler = logging.StreamHandler() -logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s") -logging_console_handler.setFormatter(logging_formatter) -logger.addHandler(logging_console_handler) +logger = logging.getLogger(__file__) def validate_and_load_config( @@ -89,7 +81,7 @@ def handle_extract_file_cmd( if clp_config is None: return -1 - container_name = generate_container_name(JobType.FILE_EXTRACTION) + container_name = generate_container_name(str(JobType.FILE_EXTRACTION)) container_clp_config, mounts = generate_container_config(clp_config, clp_home) generated_config_path_on_container, generated_config_path_on_host = dump_container_config( container_clp_config, clp_config, container_name @@ -164,7 +156,7 @@ def handle_extract_stream_cmd( if clp_config is None: return -1 - container_name = generate_container_name(JobType.IR_EXTRACTION) + container_name = generate_container_name(str(JobType.IR_EXTRACTION)) container_clp_config, mounts = generate_container_config(clp_config, clp_home) generated_config_path_on_container, generated_config_path_on_host = dump_container_config( container_clp_config, clp_config, container_name diff --git a/components/clp-package-utils/clp_package_utils/scripts/del_archives.py b/components/clp-package-utils/clp_package_utils/scripts/del_archives.py new file mode 100644 index 000000000..54d959771 --- /dev/null +++ b/components/clp-package-utils/clp_package_utils/scripts/del_archives.py @@ -0,0 +1,103 @@ +import argparse +import logging +import subprocess +import sys +from pathlib import Path + +from clp_package_utils.general import ( + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, + dump_container_config, + generate_container_config, + generate_container_name, + generate_container_start_cmd, + get_clp_home, + load_config_file, + validate_and_load_db_credentials_file, +) + +logger = logging.getLogger(__file__) + + +def main(argv): + clp_home = get_clp_home() + default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH + + args_parser = argparse.ArgumentParser( + description="Deletes archives that fall within the specified time range." + ) + args_parser.add_argument( + "--config", + "-c", + default=str(default_config_file_path), + help="CLP package configuration file.", + ) + args_parser.add_argument( + "--begin-ts", + type=int, + default=0, + help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", + ) + args_parser.add_argument( + "--end-ts", + type=int, + required=True, + help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", + ) + parsed_args = args_parser.parse_args(argv[1:]) + + # Validate and load config file + try: + config_file_path = Path(parsed_args.config) + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) + clp_config.validate_logs_dir() + + # Validate and load necessary credentials + validate_and_load_db_credentials_file(clp_config, clp_home, False) + except: + logger.exception("Failed to load config.") + return -1 + + # Validate the input timestamp + begin_ts = parsed_args.begin_ts + end_ts = parsed_args.end_ts + if begin_ts > end_ts: + logger.error("begin-ts must be <= end-ts") + return -1 + if end_ts < 0 or begin_ts < 0: + logger.error("begin_ts and end_ts must be non-negative.") + return -1 + + container_name = generate_container_name("del-archives") + + container_clp_config, mounts = generate_container_config(clp_config, clp_home) + generated_config_path_on_container, generated_config_path_on_host = dump_container_config( + container_clp_config, clp_config, container_name + ) + + necessary_mounts = [mounts.clp_home, mounts.logs_dir, mounts.archives_output_dir] + container_start_cmd = generate_container_start_cmd( + container_name, necessary_mounts, clp_config.execution_container + ) + + # fmt: off + del_archive_cmd = [ + "python3", + "-m", "clp_package_utils.scripts.native.del_archives", + "--config", str(generated_config_path_on_container), + str(begin_ts), + str(end_ts) + + ] + # fmt: on + + cmd = container_start_cmd + del_archive_cmd + subprocess.run(cmd, check=True) + + # Remove generated files + generated_config_path_on_host.unlink() + + return 0 + + +if "__main__" == __name__: + sys.exit(main(sys.argv)) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py index cb495204f..b6d9bb7eb 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py @@ -23,15 +23,7 @@ load_config_file, ) -# Setup logging -# Create logger logger = logging.getLogger(__file__) -logger.setLevel(logging.INFO) -# Setup console logging -logging_console_handler = logging.StreamHandler() -logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s") -logging_console_handler.setFormatter(logging_formatter) -logger.addHandler(logging_console_handler) def print_compression_job_status(job_row, current_time): diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py index 7cce5d92a..d16cdcb6f 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py @@ -32,15 +32,7 @@ wait_for_query_job, ) -# Setup logging -# Create logger logger = logging.getLogger(__file__) -logger.setLevel(logging.INFO) -# Setup console logging -logging_console_handler = logging.StreamHandler() -logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s") -logging_console_handler.setFormatter(logging_formatter) -logger.addHandler(logging_console_handler) def get_orig_file_id(db_config: Database, path: str) -> Optional[str]: diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py b/components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py new file mode 100644 index 000000000..735bf299d --- /dev/null +++ b/components/clp-package-utils/clp_package_utils/scripts/native/del_archives.py @@ -0,0 +1,139 @@ +import argparse +import logging +import shutil +import sys +from contextlib import closing +from pathlib import Path +from typing import List + +from clp_py_utils.clp_config import Database +from clp_py_utils.sql_adapter import SQL_Adapter + +from clp_package_utils.general import ( + CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, + get_clp_home, + load_config_file, +) + +logger = logging.getLogger(__file__) + + +def main(argv): + clp_home = get_clp_home() + default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH + + args_parser = argparse.ArgumentParser( + description="Deletes archives that fall within the specified time range." + ) + args_parser.add_argument( + "--config", + "-c", + required=True, + default=str(default_config_file_path), + help="CLP configuration file.", + ) + args_parser.add_argument( + "begin_ts", + type=int, + help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.", + ) + args_parser.add_argument( + "end_ts", + type=int, + help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.", + ) + parsed_args = args_parser.parse_args(argv[1:]) + + # Validate and load config file + config_file_path = Path(parsed_args.config) + try: + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) + clp_config.validate_logs_dir() + except: + logger.exception("Failed to load config.") + return -1 + + database_config = clp_config.database + archives_dir = clp_config.archive_output.directory + if not archives_dir.exists(): + logger.error("`archive_output.directory` doesn't exist.") + return -1 + + return _delete_archives( + archives_dir, + database_config, + parsed_args.begin_ts, + parsed_args.end_ts, + ) + + +def _delete_archives( + archives_dir: Path, + database_config: Database, + begin_ts: int, + end_ts: int, +) -> int: + """ + Deletes all archives where `begin_ts <= archive.begin_timestamp` and + `archive.end_timestamp <= end_ts` from both the metadata database and disk. + :param archives_dir: + :param database_config: + :param begin_ts: + :param end_ts: + :return: 0 on success, -1 otherwise. + """ + + archive_ids: List[str] + logger.info("Starting to delete archives from the database.") + try: + sql_adapter = SQL_Adapter(database_config) + clp_db_connection_params = database_config.get_clp_connection_params_and_type(True) + table_prefix = clp_db_connection_params["table_prefix"] + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + db_cursor.execute( + f""" + DELETE FROM `{table_prefix}archives` + WHERE begin_timestamp >= %s AND end_timestamp <= %s + RETURNING id + """, + (begin_ts, end_ts), + ) + results = db_cursor.fetchall() + + if 0 == len(results): + logger.info("No archives (exclusively) within the specified time range.") + return 0 + + archive_ids = [result["id"] for result in results] + db_cursor.execute( + f""" + DELETE FROM `{table_prefix}files` + WHERE archive_id in ({', '.join(['%s'] * len(archive_ids))}) + """, + archive_ids, + ) + db_conn.commit() + except Exception: + logger.exception("Failed to delete archives from the database. Aborting deletion.") + return -1 + + logger.info(f"Finished deleting archives from the database.") + + for archive_id in archive_ids: + archive_path = archives_dir / archive_id + if not archive_path.is_dir(): + logger.warning(f"Archive {archive_id} is not a directory. Skipping deletion.") + continue + + logger.info(f"Deleting archive {archive_id} from disk.") + shutil.rmtree(archive_path) + + logger.info(f"Finished deleting archives from disk.") + + return 0 + + +if "__main__" == __name__: + sys.exit(main(sys.argv)) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/search.py b/components/clp-package-utils/clp_package_utils/scripts/native/search.py index 7dd247fa5..d166cf35f 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/search.py @@ -26,15 +26,7 @@ wait_for_query_job, ) -# Setup logging -# Create logger logger = logging.getLogger(__file__) -logger.setLevel(logging.INFO) -# Setup console logging -logging_console_handler = logging.StreamHandler() -logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s") -logging_console_handler.setFormatter(logging_formatter) -logger.addHandler(logging_console_handler) def create_and_monitor_job_in_db( diff --git a/components/clp-package-utils/clp_package_utils/scripts/search.py b/components/clp-package-utils/clp_package_utils/scripts/search.py index f3f02046d..beb7fb0b0 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/search.py @@ -20,15 +20,7 @@ validate_and_load_db_credentials_file, ) -# Setup logging -# Create logger logger = logging.getLogger(__file__) -logger.setLevel(logging.INFO) -# Setup console logging -logging_console_handler = logging.StreamHandler() -logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s") -logging_console_handler.setFormatter(logging_formatter) -logger.addHandler(logging_console_handler) def main(argv): @@ -82,7 +74,7 @@ def main(argv): logger.exception("Failed to load config.") return -1 - container_name = generate_container_name(JobType.SEARCH) + container_name = generate_container_name(str(JobType.SEARCH)) container_clp_config, mounts = generate_container_config(clp_config, clp_home) generated_config_path_on_container, generated_config_path_on_host = dump_container_config( diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index 6732ded0b..8097929f1 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -59,15 +59,7 @@ validate_worker_config, ) -# Setup logging -# Create logger -logger = logging.getLogger("clp") -logger.setLevel(logging.INFO) -# Setup console logging -logging_console_handler = logging.StreamHandler() -logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s") -logging_console_handler.setFormatter(logging_formatter) -logger.addHandler(logging_console_handler) +logger = logging.getLogger(__file__) def container_exists(container_name): diff --git a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py index f100a098a..a55d7a795 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py @@ -31,15 +31,7 @@ validate_and_load_queue_credentials_file, ) -# Setup logging -# Create logger -logger = logging.getLogger("clp") -logger.setLevel(logging.INFO) -# Setup console logging -logging_console_handler = logging.StreamHandler() -logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s") -logging_console_handler.setFormatter(logging_formatter) -logger.addHandler(logging_console_handler) +logger = logging.getLogger(__file__) def stop_running_container(container_name: str, already_exited_containers: List[str], force: bool): diff --git a/components/package-template/src/sbin/admin-tools/del-archives.sh b/components/package-template/src/sbin/admin-tools/del-archives.sh new file mode 100755 index 000000000..4d7ebc6b7 --- /dev/null +++ b/components/package-template/src/sbin/admin-tools/del-archives.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash + +script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" +package_root="$script_dir/../.." + +PYTHONPATH=$(readlink -f "$package_root/lib/python3/site-packages") \ + python3 \ + -m clp_package_utils.scripts.del_archives \ + "$@"