diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index ce0f10309..b84bf298e 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -6,6 +6,9 @@ import socket import subprocess import typing +import uuid +from enum import auto +from typing import List, Tuple import yaml from clp_py_utils.clp_config import ( @@ -24,6 +27,7 @@ read_yaml_config_file, validate_path_could_be_dir, ) +from strenum import KebabCaseStrEnum # CONSTANTS # Paths @@ -38,6 +42,12 @@ class DockerMountType(enum.IntEnum): BIND = 0 +class JobType(KebabCaseStrEnum): + COMPRESSION = auto() + DECOMPRESSION = auto() + SEARCH = auto() + + class DockerMount: def __init__( self, @@ -91,6 +101,14 @@ def get_clp_home(): return clp_home.resolve() +def generate_container_name(job_type: JobType) -> str: + """ + :param job_type: + :return: A unique container name for the given job type. + """ + return f"clp-{job_type}-{str(uuid.uuid4())[-4:]}" + + def check_dependencies(): try: subprocess.run( @@ -177,12 +195,15 @@ def is_path_already_mounted( return host_path_relative_to_mounted_root == container_path_relative_to_mounted_root -def generate_container_config(clp_config: CLPConfig, clp_home: pathlib.Path): +def generate_container_config( + clp_config: CLPConfig, clp_home: pathlib.Path +) -> Tuple[CLPConfig, CLPDockerMounts]: """ Copies the given config and sets up mounts mapping the relevant host paths into the container :param clp_config: :param clp_home: + :return: The container config and the mounts. """ container_clp_config = clp_config.copy(deep=True) @@ -241,6 +262,57 @@ def generate_container_config(clp_config: CLPConfig, clp_home: pathlib.Path): return container_clp_config, docker_mounts +def dump_container_config( + container_clp_config: CLPConfig, clp_config: CLPConfig, container_name: str +) -> Tuple[pathlib.Path, pathlib.Path]: + """ + Writes the given config to the logs directory so that it's accessible in the container. + :param container_clp_config: The config to write. + :param clp_config: The corresponding config on the host (used to determine the logs directory). + :param container_name: + :return: The path to the config file in the container and on the host. + """ + container_config_filename = f".{container_name}-config.yml" + config_file_path_on_host = clp_config.logs_directory / container_config_filename + config_file_path_on_container = container_clp_config.logs_directory / container_config_filename + with open(config_file_path_on_host, "w") as f: + yaml.safe_dump(container_clp_config.dump_to_primitive_dict(), f) + + return config_file_path_on_container, config_file_path_on_host + + +def generate_container_start_cmd( + container_name: str, container_mounts: List[CLPDockerMounts], container_image: str +) -> List[str]: + """ + Generates the command to start a container with the given mounts and name. + :param container_name: + :param container_mounts: + :param container_image: + :return: The command. + """ + clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages" + # fmt: off + container_start_cmd = [ + "docker", "run", + "-i", + "--rm", + "--network", "host", + "-w", str(CONTAINER_CLP_HOME), + "-e", f"PYTHONPATH={clp_site_packages_dir}", + "-u", f"{os.getuid()}:{os.getgid()}", + "--name", container_name, + "--log-driver", "local" + ] + for mount in container_mounts: + if mount: + container_start_cmd.append("--mount") + container_start_cmd.append(str(mount)) + container_start_cmd.append(container_image) + + return container_start_cmd + + def validate_config_key_existence(config, key): try: value = get_config_value(config, key) @@ -249,7 +321,7 @@ def validate_config_key_existence(config, key): return value -def validate_and_load_config_file( +def load_config_file( config_file_path: pathlib.Path, default_config_file_path: pathlib.Path, clp_home: pathlib.Path ): if config_file_path.exists(): diff --git a/components/clp-package-utils/clp_package_utils/scripts/compress.py b/components/clp-package-utils/clp_package_utils/scripts/compress.py index 61495a4cd..d0aa30913 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/compress.py @@ -1,20 +1,20 @@ import argparse import logging -import os import pathlib import subprocess import sys import uuid -import yaml - from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, - CONTAINER_CLP_HOME, CONTAINER_INPUT_LOGS_ROOT_DIR, + dump_container_config, generate_container_config, + generate_container_name, + generate_container_start_cmd, get_clp_home, - validate_and_load_config_file, + JobType, + load_config_file, validate_and_load_db_credentials_file, ) @@ -57,51 +57,32 @@ def main(argv): # Validate and load config file try: config_file_path = pathlib.Path(parsed_args.config) - clp_config = validate_and_load_config_file( - config_file_path, default_config_file_path, clp_home - ) + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) clp_config.validate_logs_dir() + # Validate and load necessary credentials validate_and_load_db_credentials_file(clp_config, clp_home, False) except: logger.exception("Failed to load config.") return -1 - container_name = f"clp-compressor-{str(uuid.uuid4())[-4:]}" + container_name = generate_container_name(JobType.COMPRESSION) container_clp_config, mounts = generate_container_config(clp_config, clp_home) - container_config_filename = f".{container_name}-config.yml" - container_config_file_path_on_host = clp_config.logs_directory / container_config_filename - with open(container_config_file_path_on_host, "w") as f: - yaml.safe_dump(container_clp_config.dump_to_primitive_dict(), f) + generated_config_path_on_container, generated_config_path_on_host = dump_container_config( + container_clp_config, clp_config, container_name + ) - clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages" - # fmt: off - container_start_cmd = [ - "docker", "run", - "-i", - "--rm", - "--network", "host", - "-w", str(CONTAINER_CLP_HOME), - "-e", f"PYTHONPATH={clp_site_packages_dir}", - "-u", f"{os.getuid()}:{os.getgid()}", - "--name", container_name, - "--log-driver", "local", - "--mount", str(mounts.clp_home), - ] - # fmt: on - necessary_mounts = [mounts.input_logs_dir, mounts.data_dir, mounts.logs_dir] - for mount in necessary_mounts: - if mount: - container_start_cmd.append("--mount") - container_start_cmd.append(str(mount)) - container_start_cmd.append(clp_config.execution_container) + necessary_mounts = [mounts.clp_home, mounts.input_logs_dir, mounts.data_dir, mounts.logs_dir] + container_start_cmd = generate_container_start_cmd( + container_name, necessary_mounts, clp_config.execution_container + ) # fmt: off compress_cmd = [ "python3", "-m", "clp_package_utils.scripts.native.compress", - "--config", str(container_clp_config.logs_directory / container_config_filename), + "--config", str(generated_config_path_on_container), "--remove-path-prefix", str(CONTAINER_INPUT_LOGS_ROOT_DIR), ] # fmt: on @@ -140,7 +121,7 @@ def main(argv): subprocess.run(cmd, check=True) # Remove generated files - container_config_file_path_on_host.unlink() + generated_config_path_on_host.unlink() return 0 diff --git a/components/clp-package-utils/clp_package_utils/scripts/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/decompress.py index f20291b50..f700d9721 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/decompress.py @@ -1,21 +1,22 @@ import argparse import logging -import os import pathlib import subprocess import sys -import uuid -import yaml +from clp_py_utils.clp_config import CLPConfig from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, - CONTAINER_CLP_HOME, DockerMount, DockerMountType, + dump_container_config, generate_container_config, + generate_container_name, + generate_container_start_cmd, get_clp_home, - validate_and_load_config_file, + JobType, + load_config_file, validate_and_load_db_credentials_file, validate_path_could_be_dir, ) @@ -53,11 +54,10 @@ def main(argv): # Validate and load config file try: config_file_path = pathlib.Path(parsed_args.config) - clp_config = validate_and_load_config_file( - config_file_path, default_config_file_path, clp_home - ) + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) clp_config.validate_logs_dir() + # Validate and load necessary credentials validate_and_load_db_credentials_file(clp_config, clp_home, False) except: logger.exception("Failed to load config.") @@ -76,33 +76,16 @@ def main(argv): return -1 extraction_dir.mkdir(exist_ok=True) - container_name = f"clp-decompressor-{str(uuid.uuid4())[-4:]}" - + container_name = generate_container_name(JobType.DECOMPRESSION) container_clp_config, mounts = generate_container_config(clp_config, clp_home) - container_config_filename = f".{container_name}-config.yml" - container_config_file_path_on_host = clp_config.logs_directory / container_config_filename - with open(container_config_file_path_on_host, "w") as f: - yaml.safe_dump(container_clp_config.dump_to_primitive_dict(), f) - - clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages" - # fmt: off - container_start_cmd = [ - "docker", "run", - "-i", - "--rm", - "--network", "host", - "-w", str(CONTAINER_CLP_HOME), - "-e", f"PYTHONPATH={clp_site_packages_dir}", - "-u", f"{os.getuid()}:{os.getgid()}", - "--name", container_name, - "--log-driver", "local", - "--mount", str(mounts.clp_home), - ] - # fmt: on + generated_config_path_on_container, generated_config_path_on_host = dump_container_config( + container_clp_config, clp_config, container_name + ) # Set up mounts container_extraction_dir = pathlib.Path("/") / "mnt" / "extraction-dir" necessary_mounts = [ + mounts.clp_home, mounts.data_dir, mounts.logs_dir, mounts.archives_output_dir, @@ -120,18 +103,15 @@ def main(argv): container_paths_to_decompress_file_path, ) ) - for mount in necessary_mounts: - if mount: - container_start_cmd.append("--mount") - container_start_cmd.append(str(mount)) - - container_start_cmd.append(clp_config.execution_container) + container_start_cmd = generate_container_start_cmd( + container_name, necessary_mounts, clp_config.execution_container + ) # fmt: off decompress_cmd = [ "python3", "-m", "clp_package_utils.scripts.native.decompress", - "--config", str(container_clp_config.logs_directory / container_config_filename), + "--config", str(generated_config_path_on_container), "-d", str(container_extraction_dir), ] # fmt: on @@ -145,7 +125,7 @@ def main(argv): subprocess.run(cmd, check=True) # Remove generated files - container_config_file_path_on_host.unlink() + generated_config_path_on_host.unlink() return 0 diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py index a08602007..cb495204f 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/compress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/compress.py @@ -20,7 +20,7 @@ from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, get_clp_home, - validate_and_load_config_file, + load_config_file, ) # Setup logging @@ -170,9 +170,7 @@ def main(argv): # Validate and load config file try: config_file_path = pathlib.Path(parsed_args.config) - clp_config = validate_and_load_config_file( - config_file_path, default_config_file_path, clp_home - ) + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) clp_config.validate_input_logs_dir() clp_config.validate_logs_dir() except: diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py index 9ca3ab7b6..9331492b4 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/decompress.py @@ -4,6 +4,7 @@ import subprocess import sys import uuid +from typing import Optional import yaml from clp_py_utils.clp_config import CLPConfig @@ -11,7 +12,7 @@ from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, get_clp_home, - validate_and_load_config_file, + load_config_file, ) # Setup logging @@ -25,15 +26,62 @@ logger.addHandler(logging_console_handler) -def decompress_paths( +def validate_and_load_config_file( clp_home: pathlib.Path, - paths, - list_path: pathlib.Path, - clp_config: CLPConfig, - archives_dir: pathlib.Path, - logs_dir: pathlib.Path, - extraction_dir: pathlib.Path, + config_file_path: pathlib.Path, + default_config_file_path: pathlib.Path, +) -> Optional[CLPConfig]: + """ + Validates and loads the config file. + :param clp_home: + :param config_file_path: + :param default_config_file_path: + :return: clp_config on success, None otherwise. + """ + try: + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) + clp_config.validate_archive_output_dir() + clp_config.validate_logs_dir() + return clp_config + except Exception: + logger.exception("Failed to load config.") + return None + + +def handle_decompression_command( + parsed_args: argparse.Namespace, clp_home: pathlib.Path, default_config_file_path: pathlib.Path ): + """ + Handles the decompression command. + :param parsed_args: + :param clp_home: + :param default_config_file_path: + :return: 0 on success, -1 otherwise. + """ + # Validate paths were specified using only one method + if len(parsed_args.paths) > 0 and parsed_args.files_from is not None: + logger.error("Paths cannot be specified both on the command line and through a file.") + return -1 + + # Validate extraction directory + extraction_dir = pathlib.Path(parsed_args.extraction_dir) + if not extraction_dir.is_dir(): + logger.error(f"extraction-dir ({extraction_dir}) is not a valid directory.") + return -1 + + # Validate and load config file + clp_config = validate_and_load_config_file( + clp_home, pathlib.Path(parsed_args.config), default_config_file_path + ) + if not clp_config: + return -1 + + paths = parsed_args.paths + list_path = parsed_args.files_from + + logs_dir = clp_config.logs_directory + archives_dir = clp_config.archive_output.directory + # Generate database config file for clp db_config_file_path = logs_dir / f".decompress-db-config-{uuid.uuid4()}.yml" with open(db_config_file_path, "w") as f: @@ -46,6 +94,7 @@ def decompress_paths( "--db-config-file", str(db_config_file_path), ] # fmt: on + files_to_decompress_list_path = None if list_path is not None: decompression_cmd.append("-f") @@ -93,37 +142,7 @@ def main(argv): ) parsed_args = args_parser.parse_args(argv[1:]) - # Validate paths were specified using only one method - if len(parsed_args.paths) > 0 and parsed_args.files_from is not None: - args_parser.error("Paths cannot be specified both on the command line and through a file.") - - # Validate extraction directory - extraction_dir = pathlib.Path(parsed_args.extraction_dir) - if not extraction_dir.is_dir(): - logger.error(f"extraction-dir ({extraction_dir}) is not a valid directory.") - return -1 - - # Validate and load config file - try: - config_file_path = pathlib.Path(parsed_args.config) - clp_config = validate_and_load_config_file( - config_file_path, default_config_file_path, clp_home - ) - clp_config.validate_archive_output_dir() - clp_config.validate_logs_dir() - except: - logger.exception("Failed to load config.") - return -1 - - return decompress_paths( - clp_home, - parsed_args.paths, - parsed_args.files_from, - clp_config, - clp_config.archive_output.directory, - clp_config.logs_directory, - extraction_dir, - ) + return handle_decompression_command(parsed_args, clp_home, default_config_file_path) if "__main__" == __name__: diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/search.py b/components/clp-package-utils/clp_package_utils/scripts/native/search.py index 9041b0006..7dd247fa5 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/native/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/native/search.py @@ -4,16 +4,13 @@ import asyncio import ipaddress import logging -import multiprocessing import pathlib import socket import sys -import time -from contextlib import closing import msgpack import pymongo -from clp_py_utils.clp_config import Database, QUERY_JOBS_TABLE_NAME, ResultsCache +from clp_py_utils.clp_config import Database, ResultsCache from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType from job_orchestration.scheduler.job_config import AggregationConfig, SearchJobConfig @@ -21,7 +18,12 @@ from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, get_clp_home, - validate_and_load_config_file, + load_config_file, +) +from clp_package_utils.scripts.native.utils import ( + run_function_in_process, + submit_query_job, + wait_for_query_job, ) # Setup logging @@ -35,41 +37,6 @@ logger.addHandler(logging_console_handler) -async def run_function_in_process(function, *args, initializer=None, init_args=None): - """ - Runs the given function in a separate process wrapped in a *cancellable* - asyncio task. This is necessary because asyncio's multiprocessing process - cannot be cancelled once it's started. - :param function: Method to run - :param args: Arguments for the method - :param initializer: Initializer for each process in the pool - :param init_args: Arguments for the initializer - :return: Return value of the method - """ - pool = multiprocessing.Pool(1, initializer, init_args) - - loop = asyncio.get_event_loop() - fut = loop.create_future() - - def process_done_callback(obj): - loop.call_soon_threadsafe(fut.set_result, obj) - - def process_error_callback(err): - loop.call_soon_threadsafe(fut.set_exception, err) - - pool.apply_async( - function, args, callback=process_done_callback, error_callback=process_error_callback - ) - - try: - return await fut - except asyncio.CancelledError: - pass - finally: - pool.terminate() - pool.close() - - def create_and_monitor_job_in_db( db_config: Database, results_cache: ResultsCache, @@ -106,47 +73,22 @@ def create_and_monitor_job_in_db( search_config.tags = tag_list sql_adapter = SQL_Adapter(db_config) - with closing(sql_adapter.create_connection(True)) as db_conn, closing( - db_conn.cursor(dictionary=True) - ) as db_cursor: - # Create job - db_cursor.execute( - f"INSERT INTO `{QUERY_JOBS_TABLE_NAME}` (`job_config`, `type`) VALUES (%s, %s)", - (msgpack.packb(search_config.dict()), QueryJobType.SEARCH_OR_AGGREGATION), - ) - db_conn.commit() - job_id = db_cursor.lastrowid + job_id = submit_query_job(sql_adapter, search_config, QueryJobType.SEARCH_OR_AGGREGATION) + job_status = wait_for_query_job(sql_adapter, job_id) - # Wait for the job to be marked complete - while True: - db_cursor.execute( - f"SELECT `status` FROM `{QUERY_JOBS_TABLE_NAME}` WHERE `id` = {job_id}" - ) - # There will only ever be one row since it's impossible to have more than one job with - # the same ID - new_status = db_cursor.fetchall()[0]["status"] - db_conn.commit() - if new_status in ( - QueryJobStatus.SUCCEEDED, - QueryJobStatus.FAILED, - QueryJobStatus.CANCELLED, - ): - break - - time.sleep(0.5) + if do_count_aggregation is None and count_by_time_bucket_size is None: + return + with pymongo.MongoClient(results_cache.get_uri()) as client: + search_results_collection = client[results_cache.db_name][str(job_id)] + if do_count_aggregation is not None: + for document in search_results_collection.find(): + print(f"tags: {document['group_tags']} count: {document['records'][0]['count']}") + elif count_by_time_bucket_size is not None: + for document in search_results_collection.find(): + print(f"timestamp: {document['timestamp']} count: {document['count']}") - if do_count_aggregation is None and count_by_time_bucket_size is None: - return - with pymongo.MongoClient(results_cache.get_uri()) as client: - search_results_collection = client[results_cache.db_name][str(job_id)] - if do_count_aggregation is not None: - for document in search_results_collection.find(): - print( - f"tags: {document['group_tags']} count: {document['records'][0]['count']}" - ) - elif count_by_time_bucket_size is not None: - for document in search_results_collection.find(): - print(f"timestamp: {document['timestamp']} count: {document['count']}") + if job_status != QueryJobStatus.SUCCEEDED: + logger.error(f"job {job_id} finished with unexpected status: {job_status}") async def worker_connection_handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter): @@ -328,9 +270,7 @@ def main(argv): # Validate and load config file try: config_file_path = pathlib.Path(parsed_args.config) - clp_config = validate_and_load_config_file( - config_file_path, default_config_file_path, clp_home - ) + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) clp_config.validate_logs_dir() except: logger.exception("Failed to load config.") diff --git a/components/clp-package-utils/clp_package_utils/scripts/native/utils.py b/components/clp-package-utils/clp_package_utils/scripts/native/utils.py new file mode 100644 index 000000000..6b94e4676 --- /dev/null +++ b/components/clp-package-utils/clp_package_utils/scripts/native/utils.py @@ -0,0 +1,98 @@ +import asyncio +import multiprocessing +import time +from contextlib import closing + +import msgpack +from clp_py_utils.clp_config import ( + QUERY_JOBS_TABLE_NAME, +) +from clp_py_utils.sql_adapter import SQL_Adapter +from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType +from job_orchestration.scheduler.scheduler_data import QueryJobConfig + + +async def run_function_in_process(function, *args, initializer=None, init_args=None): + """ + Runs the given function in a separate process wrapped in a *cancellable* + asyncio task. This is necessary because asyncio's multiprocessing process + cannot be cancelled once it's started. + :param function: Method to run + :param args: Arguments for the method + :param initializer: Initializer for each process in the pool + :param init_args: Arguments for the initializer + :return: Return value of the method + """ + pool = multiprocessing.Pool(1, initializer, init_args) + + loop = asyncio.get_event_loop() + fut = loop.create_future() + + def process_done_callback(obj): + loop.call_soon_threadsafe(fut.set_result, obj) + + def process_error_callback(err): + loop.call_soon_threadsafe(fut.set_exception, err) + + pool.apply_async( + function, args, callback=process_done_callback, error_callback=process_error_callback + ) + + try: + return await fut + except asyncio.CancelledError: + pass + finally: + pool.terminate() + pool.close() + + +def submit_query_job( + sql_adapter: SQL_Adapter, job_config: QueryJobConfig, job_type: QueryJobType +) -> int: + """ + Submits a query job. + :param sql_adapter: + :param job_config: + :param job_type: + :return: The job's ID. + """ + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + # Create job + db_cursor.execute( + f"INSERT INTO `{QUERY_JOBS_TABLE_NAME}` (`job_config`, `type`) VALUES (%s, %s)", + (msgpack.packb(job_config.dict()), job_type), + ) + db_conn.commit() + return db_cursor.lastrowid + + +def wait_for_query_job(sql_adapter: SQL_Adapter, job_id: int) -> QueryJobStatus: + """ + Waits for the query job with the given ID to complete. + :param sql_adapter: + :param job_id: + :return: The job's status on completion. + """ + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + # Wait for the job to be marked complete + while True: + db_cursor.execute( + f"SELECT `status` FROM `{QUERY_JOBS_TABLE_NAME}` WHERE `id` = {job_id}" + ) + # There will only ever be one row since it's impossible to have more than one job with + # the same ID + new_status = db_cursor.fetchall()[0]["status"] + db_conn.commit() + if new_status in ( + QueryJobStatus.SUCCEEDED, + QueryJobStatus.FAILED, + QueryJobStatus.CANCELLED, + ): + return new_status + + time.sleep(0.5) diff --git a/components/clp-package-utils/clp_package_utils/scripts/search.py b/components/clp-package-utils/clp_package_utils/scripts/search.py index 2f2450430..f3f02046d 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/search.py +++ b/components/clp-package-utils/clp_package_utils/scripts/search.py @@ -10,10 +10,13 @@ from clp_package_utils.general import ( CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH, - CONTAINER_CLP_HOME, + dump_container_config, generate_container_config, + generate_container_name, + generate_container_start_cmd, get_clp_home, - validate_and_load_config_file, + JobType, + load_config_file, validate_and_load_db_credentials_file, ) @@ -70,52 +73,32 @@ def main(argv): # Validate and load config file try: config_file_path = pathlib.Path(parsed_args.config) - clp_config = validate_and_load_config_file( - config_file_path, default_config_file_path, clp_home - ) + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) clp_config.validate_logs_dir() # Validate and load necessary credentials - validate_and_load_db_credentials_file(clp_config, clp_home, True) + validate_and_load_db_credentials_file(clp_config, clp_home, False) except: logger.exception("Failed to load config.") return -1 - container_name = f"clp-search-{str(uuid.uuid4())[-4:]}" + container_name = generate_container_name(JobType.SEARCH) container_clp_config, mounts = generate_container_config(clp_config, clp_home) - container_config_filename = f".{container_name}-config.yml" - container_config_file_path_on_host = clp_config.logs_directory / container_config_filename - with open(container_config_file_path_on_host, "w") as f: - yaml.safe_dump(container_clp_config.dump_to_primitive_dict(), f) + generated_config_path_on_container, generated_config_path_on_host = dump_container_config( + container_clp_config, clp_config, container_name + ) - clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages" - # fmt: off - container_start_cmd = [ - "docker", "run", - "-i", - "--rm", - "--network", "host", - "-w", str(CONTAINER_CLP_HOME), - "-e", f"PYTHONPATH={clp_site_packages_dir}", - "-u", f"{os.getuid()}:{os.getgid()}", - "--name", container_name, - "--log-driver", "local", - "--mount", str(mounts.clp_home), - ] - # fmt: on - necessary_mounts = [mounts.logs_dir] - for mount in necessary_mounts: - if mount: - container_start_cmd.append("--mount") - container_start_cmd.append(str(mount)) - container_start_cmd.append(clp_config.execution_container) + necessary_mounts = [mounts.clp_home, mounts.logs_dir] + container_start_cmd = generate_container_start_cmd( + container_name, necessary_mounts, clp_config.execution_container + ) # fmt: off search_cmd = [ "python3", "-m", "clp_package_utils.scripts.native.search", - "--config", str(container_clp_config.logs_directory / container_config_filename), + "--config", str(generated_config_path_on_container), parsed_args.wildcard_query, ] # fmt: on @@ -142,7 +125,7 @@ def main(argv): subprocess.run(cmd, check=True) # Remove generated files - container_config_file_path_on_host.unlink() + generated_config_path_on_host.unlink() return 0 diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index 5ba5bbe15..9dba79886 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -44,7 +44,7 @@ get_clp_home, is_container_exited, is_container_running, - validate_and_load_config_file, + load_config_file, validate_and_load_db_credentials_file, validate_and_load_queue_credentials_file, validate_and_load_redis_credentials_file, @@ -887,9 +887,7 @@ def main(argv): # Validate and load config file try: config_file_path = pathlib.Path(parsed_args.config) - clp_config = validate_and_load_config_file( - config_file_path, default_config_file_path, clp_home - ) + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) # Validate and load necessary credentials if target in ( diff --git a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py index 00e3f7b6b..7971dd7d8 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/stop_clp.py @@ -25,7 +25,7 @@ get_clp_home, is_container_exited, is_container_running, - validate_and_load_config_file, + load_config_file, validate_and_load_db_credentials_file, validate_and_load_queue_credentials_file, ) @@ -103,9 +103,7 @@ def main(argv): # Validate and load config file try: config_file_path = pathlib.Path(parsed_args.config) - clp_config = validate_and_load_config_file( - config_file_path, default_config_file_path, clp_home - ) + clp_config = load_config_file(config_file_path, default_config_file_path, clp_home) # Validate and load necessary credentials if target in (ALL_TARGET_NAME, CONTROLLER_TARGET_NAME, DB_COMPONENT_NAME):