diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index a2e6e344f..d1edf7d5b 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -8,7 +8,7 @@ import typing import uuid from enum import auto -from typing import List, Tuple +from typing import List, Optional, Tuple import yaml from clp_py_utils.clp_config import ( @@ -31,6 +31,9 @@ from strenum import KebabCaseStrEnum # CONSTANTS +EXTRACT_FILE_CMD = "x" +EXTRACT_IR_CMD = "i" + # Paths CONTAINER_CLP_HOME = pathlib.Path("/") / "opt" / "clp" CONTAINER_INPUT_LOGS_ROOT_DIR = pathlib.Path("/") / "mnt" / "logs" @@ -45,7 +48,8 @@ class DockerMountType(enum.IntEnum): class JobType(KebabCaseStrEnum): COMPRESSION = auto() - DECOMPRESSION = auto() + FILE_EXTRACTION = auto() + IR_EXTRACTION = auto() SEARCH = auto() @@ -283,7 +287,7 @@ def dump_container_config( def generate_container_start_cmd( - container_name: str, container_mounts: List[CLPDockerMounts], container_image: str + container_name: str, container_mounts: List[Optional[DockerMount]], container_image: str ) -> List[str]: """ Generates the command to start a container with the given mounts and name. diff --git a/components/clp-package-utils/clp_package_utils/scripts/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/decompress.py index f700d9721..1a2973fec 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/decompress.py @@ -3,6 +3,7 @@ import pathlib import subprocess import sys +from typing import Optional from clp_py_utils.clp_config import CLPConfig @@ -11,6 +12,8 @@ DockerMount, DockerMountType, dump_container_config, + EXTRACT_FILE_CMD, + EXTRACT_IR_CMD, generate_container_config, generate_container_name, generate_container_start_cmd, @@ -32,40 +35,43 @@ logger.addHandler(logging_console_handler) -def main(argv): - clp_home = get_clp_home() - default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH - - args_parser = argparse.ArgumentParser(description="Decompresses logs") - args_parser.add_argument( - "--config", - "-c", - type=str, - default=str(default_config_file_path), - help="CLP package configuration file.", - ) - args_parser.add_argument("paths", metavar="PATH", nargs="*", help="Files to decompress.") - args_parser.add_argument("-f", "--files-from", help="A file listing all files to decompress.") - args_parser.add_argument( - "-d", "--extraction-dir", metavar="DIR", default=".", help="Decompress files into DIR" - ) - parsed_args = args_parser.parse_args(argv[1:]) - - # Validate and load config file +def validate_and_load_config( + clp_home: pathlib.Path, + config_file_path: pathlib.Path, + default_config_file_path: pathlib.Path, +) -> Optional[CLPConfig]: + """ + Validates and loads the config file. + :param clp_home: + :param config_file_path: + :param default_config_file_path: + :return: The config object on success, None otherwise. + """ try: - config_file_path = pathlib.Path(parsed_args.config) clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) clp_config.validate_logs_dir() # Validate and load necessary credentials validate_and_load_db_credentials_file(clp_config, clp_home, False) + return clp_config except: logger.exception("Failed to load config.") - return -1 + return None + - paths_to_decompress_file_path = None +def handle_extract_file_cmd( + parsed_args, clp_home: pathlib.Path, default_config_file_path: pathlib.Path +) -> int: + """ + Handles the file extraction command. + :param parsed_args: + :param clp_home: + :param default_config_file_path: + :return: 0 on success, -1 otherwise. + """ + paths_to_extract_file_path = None if parsed_args.files_from: - paths_to_decompress_file_path = pathlib.Path(parsed_args.files_from) + paths_to_extract_file_path = pathlib.Path(parsed_args.files_from) # Validate extraction directory extraction_dir = pathlib.Path(parsed_args.extraction_dir).resolve() @@ -74,15 +80,22 @@ def main(argv): except ValueError as ex: logger.error(f"extraction-dir is invalid: {ex}") return -1 - extraction_dir.mkdir(exist_ok=True) - container_name = generate_container_name(JobType.DECOMPRESSION) + # Validate and load config file + clp_config = validate_and_load_config( + clp_home, pathlib.Path(parsed_args.config), default_config_file_path + ) + if clp_config is None: + return -1 + + container_name = generate_container_name(JobType.FILE_EXTRACTION) container_clp_config, mounts = generate_container_config(clp_config, clp_home) generated_config_path_on_container, generated_config_path_on_host = dump_container_config( container_clp_config, clp_config, container_name ) # Set up mounts + extraction_dir.mkdir(exist_ok=True) container_extraction_dir = pathlib.Path("/") / "mnt" / "extraction-dir" necessary_mounts = [ mounts.clp_home, @@ -91,16 +104,14 @@ def main(argv): mounts.archives_output_dir, DockerMount(DockerMountType.BIND, extraction_dir, container_extraction_dir), ] - container_paths_to_decompress_file_path = None - if paths_to_decompress_file_path: - container_paths_to_decompress_file_path = ( - pathlib.Path("/") / "mnt" / "paths-to-decompress.txt" - ) + container_paths_to_extract_file_path = None + if paths_to_extract_file_path: + container_paths_to_extract_file_path = pathlib.Path("/") / "mnt" / "paths-to-extract.txt" necessary_mounts.append( DockerMount( DockerMountType.BIND, - paths_to_decompress_file_path, - container_paths_to_decompress_file_path, + paths_to_extract_file_path, + container_paths_to_extract_file_path, ) ) container_start_cmd = generate_container_start_cmd( @@ -108,21 +119,26 @@ def main(argv): ) # fmt: off - decompress_cmd = [ + extract_cmd = [ "python3", "-m", "clp_package_utils.scripts.native.decompress", "--config", str(generated_config_path_on_container), + EXTRACT_FILE_CMD, "-d", str(container_extraction_dir), ] # fmt: on for path in parsed_args.paths: - decompress_cmd.append(path) - if container_paths_to_decompress_file_path: - decompress_cmd.append("--input-list") - decompress_cmd.append(container_paths_to_decompress_file_path) + extract_cmd.append(path) + if container_paths_to_extract_file_path: + extract_cmd.append("--input-list") + extract_cmd.append(container_paths_to_extract_file_path) - cmd = container_start_cmd + decompress_cmd - subprocess.run(cmd, check=True) + cmd = container_start_cmd + extract_cmd + try: + subprocess.run(cmd, check=True) + except subprocess.CalledProcessError: + logger.exception("Docker or file extraction command failed.") + return -1 # Remove generated files generated_config_path_on_host.unlink() @@ -130,5 +146,112 @@ def main(argv): return 0 +def handle_extract_ir_cmd( + parsed_args, clp_home: pathlib.Path, default_config_file_path: pathlib.Path +) -> int: + """ + Handles the IR extraction command. + :param parsed_args: + :param clp_home: + :param default_config_file_path: + :return: 0 on success, -1 otherwise. + """ + # Validate and load config file + clp_config = validate_and_load_config( + clp_home, pathlib.Path(parsed_args.config), default_config_file_path + ) + if clp_config is None: + return -1 + + container_name = generate_container_name(JobType.IR_EXTRACTION) + container_clp_config, mounts = generate_container_config(clp_config, clp_home) + generated_config_path_on_container, generated_config_path_on_host = dump_container_config( + container_clp_config, clp_config, container_name + ) + necessary_mounts = [mounts.clp_home, mounts.logs_dir] + container_start_cmd = generate_container_start_cmd( + container_name, necessary_mounts, clp_config.execution_container + ) + + # fmt: off + extract_cmd = [ + "python3", + "-m", "clp_package_utils.scripts.native.decompress", + "--config", str(generated_config_path_on_container), + EXTRACT_IR_CMD, + str(parsed_args.msg_ix), + ] + # fmt: on + if parsed_args.orig_file_id: + extract_cmd.append("--orig-file-id") + extract_cmd.append(str(parsed_args.orig_file_id)) + else: + extract_cmd.append("--orig-file-path") + extract_cmd.append(str(parsed_args.orig_file_path)) + if parsed_args.target_uncompressed_size: + extract_cmd.append("--target-uncompressed-size") + extract_cmd.append(str(parsed_args.target_uncompressed_size)) + cmd = container_start_cmd + extract_cmd + + try: + subprocess.run(cmd, check=True) + except subprocess.CalledProcessError: + logger.exception("Docker or IR extraction command failed.") + return -1 + + # Remove generated files + generated_config_path_on_host.unlink() + + return 0 + + +def main(argv): + clp_home = get_clp_home() + default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH + + args_parser = argparse.ArgumentParser(description="Decompresses logs") + args_parser.add_argument( + "--config", + "-c", + default=str(default_config_file_path), + help="CLP configuration file.", + ) + command_args_parser = args_parser.add_subparsers(dest="command", required=True) + + # File extraction command parser + file_extraction_parser = command_args_parser.add_parser(EXTRACT_FILE_CMD) + file_extraction_parser.add_argument( + "paths", metavar="PATH", nargs="*", help="Files to extract." + ) + file_extraction_parser.add_argument( + "-f", "--files-from", help="A file listing all files to extract." + ) + file_extraction_parser.add_argument( + "-d", "--extraction-dir", metavar="DIR", default=".", help="Extract files into DIR." + ) + + # IR extraction command parser + ir_extraction_parser = command_args_parser.add_parser(EXTRACT_IR_CMD) + ir_extraction_parser.add_argument("msg_ix", type=int, help="Message index.") + ir_extraction_parser.add_argument( + "--target-uncompressed-size", type=int, help="Target uncompressed IR size." + ) + + group = ir_extraction_parser.add_mutually_exclusive_group(required=True) + group.add_argument("--orig-file-id", type=str, help="Original file's ID.") + group.add_argument("--orig-file-path", type=str, help="Original file's path.") + + parsed_args = args_parser.parse_args(argv[1:]) + + command = parsed_args.command + if EXTRACT_FILE_CMD == command: + return handle_extract_file_cmd(parsed_args, clp_home, default_config_file_path) + elif EXTRACT_IR_CMD == command: + return handle_extract_ir_cmd(parsed_args, clp_home, default_config_file_path) + else: + logger.exception(f"Unexpected command: {command}") + return -1 + + if "__main__" == __name__: sys.exit(main(sys.argv)) diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py index 9331492b4..b6585b192 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py @@ -1,19 +1,31 @@ import argparse +import asyncio import logging import pathlib import subprocess import sys import uuid +from contextlib import closing from typing import Optional import yaml -from clp_py_utils.clp_config import CLPConfig +from clp_py_utils.clp_config import CLP_METADATA_TABLE_PREFIX, CLPConfig, Database +from clp_py_utils.sql_adapter import SQL_Adapter +from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType +from job_orchestration.scheduler.job_config import ExtractIrJobConfig from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, + EXTRACT_FILE_CMD, + EXTRACT_IR_CMD, get_clp_home, load_config_file, ) +from clp_package_utils.scripts.native.utils import ( + run_function_in_process, + submit_query_job, + wait_for_query_job, +) # Setup logging # Create logger @@ -26,6 +38,112 @@ logger.addHandler(logging_console_handler) +def get_orig_file_id(db_config: Database, path: str) -> Optional[str]: + """ + :param db_config: + :param path: Path of the original file. + :return: The ID of an original file which has the given path, or None if no such file exists. + NOTE: Multiple original files may have the same path in which case this method returns the ID of + only one of them. + """ + sql_adapter = SQL_Adapter(db_config) + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + db_cursor.execute( + f"SELECT orig_file_id FROM `{CLP_METADATA_TABLE_PREFIX}files` WHERE path = (%s)", + (path,), + ) + results = db_cursor.fetchall() + db_conn.commit() + + if len(results) == 0: + logger.error("No file found for the given path.") + return None + + if len(results) > 1: + logger.warning( + "Multiple files found for the given path." + " Returning the orig_file_id of one of them." + ) + + return results[0]["orig_file_id"] + + +def submit_and_monitor_ir_extraction_job_in_db( + db_config: Database, + orig_file_id: str, + msg_ix: int, + target_uncompressed_size: Optional[int], +) -> int: + """ + Submits an IR extraction job to the scheduler and waits until the job finishes. + :param db_config: + :param orig_file_id: + :param msg_ix: + :param target_uncompressed_size: + :return: 0 on success, -1 otherwise. + """ + extract_ir_config = ExtractIrJobConfig( + orig_file_id=orig_file_id, + msg_ix=msg_ix, + target_uncompressed_size=target_uncompressed_size, + ) + + sql_adapter = SQL_Adapter(db_config) + job_id = submit_query_job(sql_adapter, extract_ir_config, QueryJobType.EXTRACT_IR) + job_status = wait_for_query_job(sql_adapter, job_id) + + if QueryJobStatus.SUCCEEDED == job_status: + logger.info(f"Finished IR extraction job {job_id}.") + return 0 + + logger.error( + f"IR extraction job {job_id} finished with unexpected status: {job_status.to_str()}." + ) + return -1 + + +def handle_extract_ir_cmd( + parsed_args: argparse.Namespace, clp_home: pathlib.Path, default_config_file_path: pathlib.Path +) -> int: + """ + Handles the IR extraction command. + :param parsed_args: + :param clp_home: + :param default_config_file_path: + :return: 0 on success, -1 otherwise. + """ + # Validate and load config file + clp_config = validate_and_load_config_file( + clp_home, pathlib.Path(parsed_args.config), default_config_file_path + ) + if clp_config is None: + return -1 + + orig_file_id: str + if parsed_args.orig_file_id: + orig_file_id = parsed_args.orig_file_id + else: + orig_file_id = get_orig_file_id(clp_config.database, parsed_args.orig_file_path) + if orig_file_id is None: + return -1 + + try: + return asyncio.run( + run_function_in_process( + submit_and_monitor_ir_extraction_job_in_db, + clp_config.database, + orig_file_id, + parsed_args.msg_ix, + parsed_args.target_uncompressed_size, + ) + ) + except asyncio.CancelledError: + logger.error("IR extraction cancelled.") + return -1 + + def validate_and_load_config_file( clp_home: pathlib.Path, config_file_path: pathlib.Path, @@ -48,11 +166,11 @@ def validate_and_load_config_file( return None -def handle_decompression_command( +def handle_extract_file_cmd( parsed_args: argparse.Namespace, clp_home: pathlib.Path, default_config_file_path: pathlib.Path -): +) -> int: """ - Handles the decompression command. + Handles the file extraction command. :param parsed_args: :param clp_home: :param default_config_file_path: @@ -73,7 +191,7 @@ def handle_decompression_command( clp_config = validate_and_load_config_file( clp_home, pathlib.Path(parsed_args.config), default_config_file_path ) - if not clp_config: + if clp_config is None: return -1 paths = parsed_args.paths @@ -88,36 +206,36 @@ def handle_decompression_command( yaml.safe_dump(clp_config.database.get_clp_connection_params_and_type(True), f) # fmt: off - decompression_cmd = [ + extract_cmd = [ str(clp_home / "bin" / "clp"), "x", str(archives_dir), str(extraction_dir), "--db-config-file", str(db_config_file_path), ] # fmt: on - files_to_decompress_list_path = None + files_to_extract_list_path = None if list_path is not None: - decompression_cmd.append("-f") - decompression_cmd.append(str(list_path)) + extract_cmd.append("-f") + extract_cmd.append(str(list_path)) elif len(paths) > 0: # Write paths to file - files_to_decompress_list_path = logs_dir / f"paths-to-decompress-{uuid.uuid4()}.txt" - with open(files_to_decompress_list_path, "w") as stream: + files_to_extract_list_path = logs_dir / f"paths-to-extract-{uuid.uuid4()}.txt" + with open(files_to_extract_list_path, "w") as stream: for path in paths: stream.write(path + "\n") - decompression_cmd.append("-f") - decompression_cmd.append(str(files_to_decompress_list_path)) + extract_cmd.append("-f") + extract_cmd.append(str(files_to_extract_list_path)) - proc = subprocess.Popen(decompression_cmd) + proc = subprocess.Popen(extract_cmd) return_code = proc.wait() if 0 != return_code: - logger.error(f"Decompression failed, return_code={return_code}") + logger.error(f"File extraction failed, return_code={return_code}") return return_code # Remove generated files - if files_to_decompress_list_path is not None: - files_to_decompress_list_path.unlink() + if files_to_extract_list_path is not None: + files_to_extract_list_path.unlink() db_config_file_path.unlink() return 0 @@ -135,14 +253,41 @@ def main(argv): default=str(default_config_file_path), help="CLP configuration file.", ) - args_parser.add_argument("paths", metavar="PATH", nargs="*", help="Paths to decompress.") - args_parser.add_argument("-f", "--files-from", help="Decompress all paths in the given list.") - args_parser.add_argument( - "-d", "--extraction-dir", metavar="DIR", help="Decompress files into DIR", default="." + command_args_parser = args_parser.add_subparsers(dest="command", required=True) + + # File extraction command parser + file_extraction_parser = command_args_parser.add_parser(EXTRACT_FILE_CMD) + file_extraction_parser.add_argument( + "paths", metavar="PATH", nargs="*", help="Files to extract." + ) + file_extraction_parser.add_argument( + "-f", "--files-from", help="A file listing all files to extract." + ) + file_extraction_parser.add_argument( + "-d", "--extraction-dir", metavar="DIR", default=".", help="Extract files into DIR." ) + + # IR extraction command parser + ir_extraction_parser = command_args_parser.add_parser(EXTRACT_IR_CMD) + ir_extraction_parser.add_argument("msg_ix", type=int, help="Message index.") + ir_extraction_parser.add_argument( + "--target-uncompressed-size", type=int, help="Target uncompressed IR size." + ) + + group = ir_extraction_parser.add_mutually_exclusive_group(required=True) + group.add_argument("--orig-file-id", type=str, help="Original file's ID.") + group.add_argument("--orig-file-path", type=str, help="Original file's path.") + parsed_args = args_parser.parse_args(argv[1:]) - return handle_decompression_command(parsed_args, clp_home, default_config_file_path) + command = parsed_args.command + if EXTRACT_FILE_CMD == command: + return handle_extract_file_cmd(parsed_args, clp_home, default_config_file_path) + elif EXTRACT_IR_CMD == command: + return handle_extract_ir_cmd(parsed_args, clp_home, default_config_file_path) + else: + logger.exception(f"Unexpected command: {command}") + return -1 if "__main__" == __name__: diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/utils.py b/components/clp-package-utils/clp_package_utils/scripts/native/utils.py index 6b94e4676..2f5c51a11 100644 --- a/components/clp-package-utils/clp_package_utils/scripts/native/utils.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/utils.py @@ -86,7 +86,7 @@ def wait_for_query_job(sql_adapter: SQL_Adapter, job_id: int) -> QueryJobStatus: ) # There will only ever be one row since it's impossible to have more than one job with # the same ID - new_status = db_cursor.fetchall()[0]["status"] + new_status = QueryJobStatus(db_cursor.fetchall()[0]["status"]) db_conn.commit() if new_status in ( QueryJobStatus.SUCCEEDED,