diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 29c421109..ce0f10309 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -69,6 +69,7 @@ def __init__(self, clp_home: pathlib.Path, docker_clp_home: pathlib.Path): self.data_dir: typing.Optional[DockerMount] = None self.logs_dir: typing.Optional[DockerMount] = None self.archives_output_dir: typing.Optional[DockerMount] = None + self.ir_output_dir: typing.Optional[DockerMount] = None def get_clp_home(): @@ -224,6 +225,19 @@ def generate_container_config(clp_config: CLPConfig, clp_home: pathlib.Path): container_clp_config.archive_output.directory, ) + container_clp_config.ir_output.directory = pathlib.Path("/") / "mnt" / "ir-output" + if not is_path_already_mounted( + clp_home, + CONTAINER_CLP_HOME, + clp_config.ir_output.directory, + container_clp_config.ir_output.directory, + ): + docker_mounts.ir_output_dir = DockerMount( + DockerMountType.BIND, + clp_config.ir_output.directory, + container_clp_config.ir_output.directory, + ) + return container_clp_config, docker_mounts @@ -391,6 +405,7 @@ def validate_results_cache_config( def validate_worker_config(clp_config: CLPConfig): clp_config.validate_input_logs_dir() clp_config.validate_archive_output_dir() + clp_config.validate_ir_output_dir() def validate_webui_config( diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index a798d2112..5ba5bbe15 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -8,8 +8,8 @@ import subprocess import sys import time -import typing import uuid +from typing import Any, Dict, List, Optional import yaml from clp_py_utils.clp_config import ( @@ -526,6 +526,8 @@ def start_compression_worker( clp_config.redis.compression_backend_database, num_cpus, mounts, + None, + None, ) @@ -538,6 +540,13 @@ def start_query_worker( ): celery_method = "job_orchestration.executor.query" celery_route = f"{QueueName.QUERY}" + + query_worker_mount = [mounts.ir_output_dir] + query_worker_env = { + "CLP_IR_OUTPUT_DIR": container_clp_config.ir_output.directory, + "CLP_IR_COLLECTION": clp_config.results_cache.ir_collection_name, + } + generic_start_worker( QUERY_WORKER_COMPONENT_NAME, instance_id, @@ -549,6 +558,8 @@ def start_query_worker( clp_config.redis.query_backend_database, num_cpus, mounts, + query_worker_env, + query_worker_mount, ) @@ -563,6 +574,8 @@ def generic_start_worker( redis_database: int, num_cpus: int, mounts: CLPDockerMounts, + worker_specific_env: Dict[str, Any], + worker_specific_mount: List[Optional[DockerMount]], ): logger.info(f"Starting {component_name}...") @@ -578,6 +591,7 @@ def generic_start_worker( # Create necessary directories clp_config.archive_output.directory.mkdir(parents=True, exist_ok=True) + clp_config.ir_output.directory.mkdir(parents=True, exist_ok=True) clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages" # fmt: off @@ -605,19 +619,28 @@ def generic_start_worker( "-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}", "-e", f"CLP_STORAGE_ENGINE={clp_config.package.storage_engine}", "-u", f"{os.getuid()}:{os.getgid()}", - "--mount", str(mounts.clp_home), ] + if worker_specific_env: + for env_name, env_value in worker_specific_env.items(): + container_start_cmd.append("-e") + container_start_cmd.append(f"{env_name}={env_value}") + # fmt: on necessary_mounts = [ + mounts.clp_home, mounts.data_dir, mounts.logs_dir, mounts.archives_output_dir, mounts.input_logs_dir, ] + if worker_specific_mount: + necessary_mounts.extend(worker_specific_mount) + for mount in necessary_mounts: - if mount: - container_start_cmd.append("--mount") - container_start_cmd.append(str(mount)) + if not mount: + raise ValueError(f"Required mount configuration is empty: {necessary_mounts}") + container_start_cmd.append("--mount") + container_start_cmd.append(str(mount)) container_start_cmd.append(clp_config.execution_container) worker_cmd = [ @@ -645,8 +668,8 @@ def generic_start_worker( def update_meteor_settings( parent_key_prefix: str, - settings: typing.Dict[str, typing.Any], - updates: typing.Dict[str, typing.Any], + settings: Dict[str, Any], + updates: Dict[str, Any], ): """ Recursively updates the given Meteor settings object with the values from `updates`. diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index b3410925e..0c0ce6893 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -254,7 +254,8 @@ def validate_upsert_interval(cls, field): class ResultsCache(BaseModel): host: str = "localhost" port: int = 27017 - db_name: str = "clp-search" + db_name: str = "clp-query-results" + ir_collection_name: str = "ir-files" @validator("host") def validate_host(cls, field): @@ -268,6 +269,12 @@ def validate_db_name(cls, field): raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME}.db_name cannot be empty.") return field + @validator("ir_collection_name") + def validate_ir_collection_name(cls, field): + if "" == field: + raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME}.ir_collection_name cannot be empty.") + return field + def get_uri(self): return f"mongodb://{self.host}:{self.port}/{self.db_name}" @@ -321,6 +328,32 @@ def dump_to_primitive_dict(self): return d +class IrOutput(BaseModel): + directory: pathlib.Path = pathlib.Path("var") / "data" / "ir" + target_uncompressed_size: int = 128 * 1024 * 1024 + + @validator("directory") + def validate_directory(cls, field): + if "" == field: + raise ValueError("directory can not be empty") + return field + + @validator("target_uncompressed_size") + def validate_target_uncompressed_size(cls, field): + if field <= 0: + raise ValueError("target_uncompressed_size must be greater than 0") + return field + + def make_config_paths_absolute(self, clp_home: pathlib.Path): + self.directory = make_config_path_absolute(clp_home, self.directory) + + def dump_to_primitive_dict(self): + d = self.dict() + # Turn directory (pathlib.Path) into a primitive string + d["directory"] = str(d["directory"]) + return d + + class WebUi(BaseModel): host: str = "localhost" port: int = 4000 @@ -368,6 +401,7 @@ class CLPConfig(BaseModel): credentials_file_path: pathlib.Path = CLP_DEFAULT_CREDENTIALS_FILE_PATH archive_output: ArchiveOutput = ArchiveOutput() + ir_output: IrOutput = IrOutput() data_directory: pathlib.Path = pathlib.Path("var") / "data" logs_directory: pathlib.Path = pathlib.Path("var") / "log" @@ -377,6 +411,7 @@ def make_config_paths_absolute(self, clp_home: pathlib.Path): self.input_logs_directory = make_config_path_absolute(clp_home, self.input_logs_directory) self.credentials_file_path = make_config_path_absolute(clp_home, self.credentials_file_path) self.archive_output.make_config_paths_absolute(clp_home) + self.ir_output.make_config_paths_absolute(clp_home) self.data_directory = make_config_path_absolute(clp_home, self.data_directory) self.logs_directory = make_config_path_absolute(clp_home, self.logs_directory) self._os_release_file_path = make_config_path_absolute(clp_home, self._os_release_file_path) @@ -396,6 +431,12 @@ def validate_archive_output_dir(self): except ValueError as ex: raise ValueError(f"archive_output.directory is invalid: {ex}") + def validate_ir_output_dir(self): + try: + validate_path_could_be_dir(self.ir_output.directory) + except ValueError as ex: + raise ValueError(f"ir_output.directory is invalid: {ex}") + def validate_data_dir(self): try: validate_path_could_be_dir(self.data_directory) @@ -463,6 +504,7 @@ def load_redis_credentials_from_file(self): def dump_to_primitive_dict(self): d = self.dict() d["archive_output"] = self.archive_output.dump_to_primitive_dict() + d["ir_output"] = self.ir_output.dump_to_primitive_dict() # Turn paths into primitive strings d["input_logs_directory"] = str(self.input_logs_directory) d["credentials_file_path"] = str(self.credentials_file_path) diff --git a/components/job-orchestration/job_orchestration/executor/query/celeryconfig.py b/components/job-orchestration/job_orchestration/executor/query/celeryconfig.py index 4b9949091..994c0bbcf 100644 --- a/components/job-orchestration/job_orchestration/executor/query/celeryconfig.py +++ b/components/job-orchestration/job_orchestration/executor/query/celeryconfig.py @@ -2,10 +2,14 @@ from job_orchestration.scheduler.constants import QueueName -imports = "job_orchestration.executor.query.fs_search_task" +imports = ( + "job_orchestration.executor.query.fs_search_task", + "job_orchestration.executor.query.extract_ir_task", +) task_routes = { "job_orchestration.executor.query.fs_search_task.search": QueueName.QUERY, + "job_orchestration.executor.query.extract_ir_task.extract_ir": QueueName.QUERY, } task_create_missing_queues = True diff --git a/components/job-orchestration/job_orchestration/executor/query/extract_ir_task.py b/components/job-orchestration/job_orchestration/executor/query/extract_ir_task.py new file mode 100644 index 000000000..b04b809f3 --- /dev/null +++ b/components/job-orchestration/job_orchestration/executor/query/extract_ir_task.py @@ -0,0 +1,115 @@ +import datetime +import os +from pathlib import Path +from typing import Any, Dict, List, Optional + +from celery.app.task import Task +from celery.utils.log import get_task_logger +from clp_py_utils.clp_config import Database, StorageEngine +from clp_py_utils.clp_logging import set_logging_level +from clp_py_utils.sql_adapter import SQL_Adapter +from job_orchestration.executor.query.celery import app +from job_orchestration.executor.query.utils import ( + report_command_creation_failure, + run_query_task, +) +from job_orchestration.scheduler.job_config import ExtractIrJobConfig +from job_orchestration.scheduler.scheduler_data import QueryTaskStatus + +# Setup logging +logger = get_task_logger(__name__) + + +def make_command( + storage_engine: str, + clp_home: Path, + archives_dir: Path, + archive_id: str, + ir_output_dir: Path, + extract_ir_config: ExtractIrJobConfig, + results_cache_uri: str, + ir_collection: str, +) -> Optional[List[str]]: + if StorageEngine.CLP == storage_engine: + if not extract_ir_config.file_split_id: + logger.error("file_split_id not supplied") + return None + command = [ + str(clp_home / "bin" / "clo"), + "i", + str(archives_dir / archive_id), + extract_ir_config.file_split_id, + str(ir_output_dir), + results_cache_uri, + ir_collection, + ] + if extract_ir_config.target_uncompressed_size is not None: + command.append("--target-size") + command.append(extract_ir_config.target_uncompressed_size) + else: + logger.error(f"Unsupported storage engine {storage_engine}") + return None + + return command + + +@app.task(bind=True) +def extract_ir( + self: Task, + job_id: str, + task_id: int, + job_config_obj: dict, + archive_id: str, + clp_metadata_db_conn_params: dict, + results_cache_uri: str, +) -> Dict[str, Any]: + task_name = "IR extraction" + + # Setup logging to file + clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR")) + clp_logging_level = os.getenv("CLP_LOGGING_LEVEL") + set_logging_level(logger, clp_logging_level) + + logger.info(f"Started {task_name} task for job {job_id}") + + start_time = datetime.datetime.now() + task_status: QueryTaskStatus + sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params)) + + # Make task_command + clp_home = Path(os.getenv("CLP_HOME")) + archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR")) + clp_storage_engine = os.getenv("CLP_STORAGE_ENGINE") + ir_output_dir = Path(os.getenv("CLP_IR_OUTPUT_DIR")) + ir_collection = os.getenv("CLP_IR_COLLECTION") + extract_ir_config = ExtractIrJobConfig.parse_obj(job_config_obj) + + task_command = make_command( + storage_engine=clp_storage_engine, + clp_home=clp_home, + archives_dir=archive_directory, + archive_id=archive_id, + ir_output_dir=ir_output_dir, + extract_ir_config=extract_ir_config, + results_cache_uri=results_cache_uri, + ir_collection=ir_collection, + ) + if not task_command: + return report_command_creation_failure( + sql_adapter=sql_adapter, + logger=logger, + task_name=task_name, + task_id=task_id, + start_time=start_time, + ) + + return run_query_task( + sql_adapter=sql_adapter, + logger=logger, + clp_logs_dir=clp_logs_dir, + task_command=task_command, + task_name=task_name, + job_id=job_id, + task_id=task_id, + start_time=start_time, + ) diff --git a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py index 81ff757a2..baafca3e2 100644 --- a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py @@ -1,41 +1,25 @@ import datetime import os -import signal -import subprocess -import sys -from contextlib import closing from pathlib import Path -from typing import Any, Dict +from typing import Any, Dict, List, Optional from celery.app.task import Task from celery.utils.log import get_task_logger -from clp_py_utils.clp_config import Database, QUERY_TASKS_TABLE_NAME, StorageEngine +from clp_py_utils.clp_config import Database, StorageEngine from clp_py_utils.clp_logging import set_logging_level from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.query.celery import app +from job_orchestration.executor.query.utils import ( + report_command_creation_failure, + run_query_task, +) from job_orchestration.scheduler.job_config import SearchJobConfig -from job_orchestration.scheduler.scheduler_data import QueryTaskResult, QueryTaskStatus +from job_orchestration.scheduler.scheduler_data import QueryTaskStatus # Setup logging logger = get_task_logger(__name__) -def update_search_task_metadata( - db_cursor, - task_id: int, - kv_pairs: Dict[str, Any], -): - if not kv_pairs or len(kv_pairs) == 0: - raise ValueError("No key-value pairs provided to update search task metadata") - - query = f""" - UPDATE {QUERY_TASKS_TABLE_NAME} - SET {', '.join([f'{k}="{v}"' for k, v in kv_pairs.items()])} - WHERE id = {task_id} - """ - db_cursor.execute(query) - - def make_command( storage_engine: str, clp_home: Path, @@ -44,7 +28,7 @@ def make_command( search_config: SearchJobConfig, results_cache_uri: str, results_collection: str, -): +) -> Optional[List[str]]: if StorageEngine.CLP == storage_engine: command = [str(clp_home / "bin" / "clo"), "s", str(archives_dir / archive_id)] if search_config.path_filter is not None: @@ -59,7 +43,8 @@ def make_command( archive_id, ] else: - raise ValueError(f"Unsupported storage engine {storage_engine}") + logger.error(f"Unsupported storage engine {storage_engine}") + return None command.append(search_config.query_string) if search_config.begin_timestamp is not None: @@ -118,119 +103,50 @@ def search( clp_metadata_db_conn_params: dict, results_cache_uri: str, ) -> Dict[str, Any]: - clp_home = Path(os.getenv("CLP_HOME")) - archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR")) - clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR")) - clp_logging_level = str(os.getenv("CLP_LOGGING_LEVEL")) - clp_storage_engine = str(os.getenv("CLP_STORAGE_ENGINE")) + task_name = "search" # Setup logging to file - worker_logs_dir = clp_logs_dir / job_id - worker_logs_dir.mkdir(exist_ok=True, parents=True) + clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR")) + clp_logging_level = os.getenv("CLP_LOGGING_LEVEL") set_logging_level(logger, clp_logging_level) - clo_log_path = worker_logs_dir / f"{task_id}-clo.log" - clo_log_file = open(clo_log_path, "w") - logger.info(f"Started task for job {job_id}") + logger.info(f"Started {task_name} task for job {job_id}") - search_config = SearchJobConfig.parse_obj(job_config_obj) + start_time = datetime.datetime.now() + task_status: QueryTaskStatus sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params)) - start_time = datetime.datetime.now() - search_status = QueryTaskStatus.RUNNING - with closing(sql_adapter.create_connection(True)) as db_conn, closing( - db_conn.cursor(dictionary=True) - ) as db_cursor: - try: - search_command = make_command( - storage_engine=clp_storage_engine, - clp_home=clp_home, - archives_dir=archive_directory, - archive_id=archive_id, - search_config=search_config, - results_cache_uri=results_cache_uri, - results_collection=job_id, - ) - except ValueError as e: - error_message = f"Error creating search command: {e}" - logger.error(error_message) - - update_search_task_metadata( - db_cursor, - task_id, - dict(status=QueryTaskStatus.FAILED, duration=0, start_time=start_time), - ) - db_conn.commit() - clo_log_file.write(error_message) - clo_log_file.close() - - return QueryTaskResult( - task_id=task_id, - status=QueryTaskStatus.FAILED, - duration=0, - error_log_path=str(clo_log_path), - ).dict() - - update_search_task_metadata( - db_cursor, task_id, dict(status=search_status, start_time=start_time) - ) - db_conn.commit() - - logger.info(f'Running: {" ".join(search_command)}') - search_proc = subprocess.Popen( - search_command, - preexec_fn=os.setpgrp, - close_fds=True, - stdout=clo_log_file, - stderr=clo_log_file, - ) + # Make task_command + clp_home = Path(os.getenv("CLP_HOME")) + archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR")) + clp_storage_engine = os.getenv("CLP_STORAGE_ENGINE") + search_config = SearchJobConfig.parse_obj(job_config_obj) - def sigterm_handler(_signo, _stack_frame): - logger.debug("Entered sigterm handler") - if search_proc.poll() is None: - logger.debug("Trying to kill search process") - # Kill the process group in case the search process also forked - os.killpg(os.getpgid(search_proc.pid), signal.SIGTERM) - os.waitpid(search_proc.pid, 0) - logger.info(f"Cancelling search task.") - # Add 128 to follow convention for exit codes from signals - # https://tldp.org/LDP/abs/html/exitcodes.html#AEN23549 - sys.exit(_signo + 128) - - # Register the function to kill the child process at exit - signal.signal(signal.SIGTERM, sigterm_handler) - - logger.info("Waiting for search to finish") - # communicate is equivalent to wait in this case, but avoids deadlocks if we switch to piping - # stdout/stderr in the future. - search_proc.communicate() - return_code = search_proc.returncode - if 0 != return_code: - search_status = QueryTaskStatus.FAILED - logger.error(f"Failed search task for job {job_id} - return_code={return_code}") - else: - search_status = QueryTaskStatus.SUCCEEDED - logger.info(f"Search task completed for job {job_id}") - - # Close log files - clo_log_file.close() - duration = (datetime.datetime.now() - start_time).total_seconds() - - with closing(sql_adapter.create_connection(True)) as db_conn, closing( - db_conn.cursor(dictionary=True) - ) as db_cursor: - update_search_task_metadata( - db_cursor, task_id, dict(status=search_status, start_time=start_time, duration=duration) + task_command = make_command( + storage_engine=clp_storage_engine, + clp_home=clp_home, + archives_dir=archive_directory, + archive_id=archive_id, + search_config=search_config, + results_cache_uri=results_cache_uri, + results_collection=str(task_id), + ) + if not task_command: + return report_command_creation_failure( + sql_adapter=sql_adapter, + logger=logger, + task_name=task_name, + task_id=task_id, + start_time=start_time, ) - db_conn.commit() - search_task_result = QueryTaskResult( - status=search_status, + return run_query_task( + sql_adapter=sql_adapter, + logger=logger, + clp_logs_dir=clp_logs_dir, + task_command=task_command, + task_name=task_name, + job_id=job_id, task_id=task_id, - duration=duration, + start_time=start_time, ) - - if QueryTaskStatus.FAILED == search_status: - search_task_result.error_log_path = str(clo_log_path) - - return search_task_result.dict() diff --git a/components/job-orchestration/job_orchestration/executor/query/utils.py b/components/job-orchestration/job_orchestration/executor/query/utils.py new file mode 100644 index 000000000..69d22398e --- /dev/null +++ b/components/job-orchestration/job_orchestration/executor/query/utils.py @@ -0,0 +1,135 @@ +import datetime +import os +import signal +import subprocess +import sys +from contextlib import closing +from logging import Logger +from pathlib import Path +from typing import Any, Dict, List + +from clp_py_utils.clp_config import QUERY_TASKS_TABLE_NAME +from clp_py_utils.sql_adapter import SQL_Adapter +from job_orchestration.scheduler.scheduler_data import QueryTaskResult, QueryTaskStatus + + +def get_task_log_file_path(clp_logs_dir: Path, job_id: str, task_id: int) -> Path: + worker_logs_dir = clp_logs_dir / job_id + worker_logs_dir.mkdir(exist_ok=True, parents=True) + return worker_logs_dir / f"{task_id}-clo.log" + + +def report_command_creation_failure( + sql_adapter: SQL_Adapter, + logger: Logger, + task_name: str, + task_id: int, + start_time: datetime.datetime, +): + logger.error(f"Error creating {task_name} command") + task_status = QueryTaskStatus.FAILED + update_query_task_metadata( + sql_adapter, + task_id, + dict(status=task_status, duration=0, start_time=start_time), + ) + + return QueryTaskResult( + task_id=task_id, + status=task_status, + duration=0, + ).dict() + + +def run_query_task( + sql_adapter: SQL_Adapter, + logger: Logger, + clp_logs_dir: Path, + task_command: List[str], + task_name: str, + job_id: str, + task_id: int, + start_time: datetime.datetime, +): + clo_log_path = get_task_log_file_path(clp_logs_dir, job_id, task_id) + clo_log_file = open(clo_log_path, "w") + + task_status = QueryTaskStatus.RUNNING + update_query_task_metadata( + sql_adapter, task_id, dict(status=task_status, start_time=start_time) + ) + + logger.info(f'Running: {" ".join(task_command)}') + task_proc = subprocess.Popen( + task_command, + preexec_fn=os.setpgrp, + close_fds=True, + stdout=clo_log_file, + stderr=clo_log_file, + ) + + def sigterm_handler(_signo, _stack_frame): + logger.debug("Entered sigterm handler") + if task_proc.poll() is None: + logger.debug(f"Trying to kill {task_name} process") + # Kill the process group in case the task process also forked + os.killpg(os.getpgid(task_proc.pid), signal.SIGTERM) + os.waitpid(task_proc.pid, 0) + logger.info(f"Cancelling {task_name} task.") + # Add 128 to follow convention for exit codes from signals + # https://tldp.org/LDP/abs/html/exitcodes.html#AEN23549 + sys.exit(_signo + 128) + + # Register the function to kill the child process at exit + signal.signal(signal.SIGTERM, sigterm_handler) + + logger.info(f"Waiting for {task_name} to finish") + # `communicate` is equivalent to `wait` in this case, but avoids deadlocks if we switch to + # piping stdout/stderr in the future. + task_proc.communicate() + return_code = task_proc.returncode + if 0 != return_code: + task_status = QueryTaskStatus.FAILED + logger.error( + f"{task_name} task {task_id} failed for job {job_id} - return_code={return_code}" + ) + else: + task_status = QueryTaskStatus.SUCCEEDED + logger.info(f"{task_name} task {task_id} completed for job {job_id}") + + clo_log_file.close() + duration = (datetime.datetime.now() - start_time).total_seconds() + + update_query_task_metadata( + sql_adapter, task_id, dict(status=task_status, start_time=start_time, duration=duration) + ) + + task_result = QueryTaskResult( + status=task_status, + task_id=task_id, + duration=duration, + ) + + if QueryTaskStatus.FAILED == task_status: + task_result.error_log_path = str(clo_log_path) + + return task_result.dict() + + +def update_query_task_metadata( + sql_adapter: SQL_Adapter, + task_id: int, + kv_pairs: Dict[str, Any], +): + with closing(sql_adapter.create_connection(True)) as db_conn, closing( + db_conn.cursor(dictionary=True) + ) as db_cursor: + if not kv_pairs or len(kv_pairs) == 0: + raise ValueError("No key-value pairs provided to update query task metadata") + + query = f""" + UPDATE {QUERY_TASKS_TABLE_NAME} + SET {', '.join([f'{k}="{v}"' for k, v in kv_pairs.items()])} + WHERE id = {task_id} + """ + db_cursor.execute(query) diff --git a/components/job-orchestration/job_orchestration/scheduler/constants.py b/components/job-orchestration/job_orchestration/scheduler/constants.py index b640524d9..131719148 100644 --- a/components/job-orchestration/job_orchestration/scheduler/constants.py +++ b/components/job-orchestration/job_orchestration/scheduler/constants.py @@ -71,6 +71,7 @@ def to_str(self) -> str: class QueryJobType(IntEnum): SEARCH_OR_AGGREGATION = 0 + EXTRACT_IR = auto() def __str__(self) -> str: return str(self.value) diff --git a/components/job-orchestration/job_orchestration/scheduler/job_config.py b/components/job-orchestration/job_orchestration/scheduler/job_config.py index 528dce21a..e90e2ee7f 100644 --- a/components/job-orchestration/job_orchestration/scheduler/job_config.py +++ b/components/job-orchestration/job_orchestration/scheduler/job_config.py @@ -42,6 +42,13 @@ class AggregationConfig(BaseModel): class QueryJobConfig(BaseModel): ... +class ExtractIrJobConfig(QueryJobConfig): + orig_file_id: str + msg_ix: int + file_split_id: typing.Optional[str] = None + target_uncompressed_size: typing.Optional[int] = None + + class SearchJobConfig(QueryJobConfig): query_string: str max_num_results: int diff --git a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py index 5331051ae..015480662 100644 --- a/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py +++ b/components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py @@ -24,7 +24,7 @@ import pathlib import sys from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple import celery import msgpack @@ -39,9 +39,10 @@ from clp_py_utils.core import read_yaml_config_file from clp_py_utils.decorators import exception_default_value from clp_py_utils.sql_adapter import SQL_Adapter +from job_orchestration.executor.query.extract_ir_task import extract_ir from job_orchestration.executor.query.fs_search_task import search from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType, QueryTaskStatus -from job_orchestration.scheduler.job_config import SearchJobConfig +from job_orchestration.scheduler.job_config import ExtractIrJobConfig, SearchJobConfig from job_orchestration.scheduler.query.reducer_handler import ( handle_reducer_connection, ReducerHandlerMessage, @@ -49,6 +50,7 @@ ReducerHandlerMessageType, ) from job_orchestration.scheduler.scheduler_data import ( + ExtractIrJob, InternalJobState, QueryJob, QueryTaskResult, @@ -277,6 +279,57 @@ def get_archives_for_search( return archives_for_search +def get_archive_and_file_split_ids_for_extraction( + db_conn, + extract_ir_config: ExtractIrJobConfig, +) -> Tuple[Optional[str], Optional[str]]: + orig_file_id = extract_ir_config.orig_file_id + msg_ix = extract_ir_config.msg_ix + + results = get_archive_and_file_split_ids(db_conn, orig_file_id, msg_ix) + if len(results) == 0: + logger.error(f"No matching file splits for orig_file_id={orig_file_id}, msg_ix={msg_ix}") + return None, None + elif len(results) > 1: + logger.error(f"Multiple file splits found for orig_file_id={orig_file_id}, msg_ix={msg_ix}") + for result in results: + logger.error(f"{result['archive_id']}:{result['id']}") + return None, None + + return results[0]["archive_id"], results[0]["file_split_id"] + + +@exception_default_value(default=[]) +def get_archive_and_file_split_ids( + db_conn, + orig_file_id: str, + msg_ix: int, +): + """ + Fetches the IDs of the file split and the archive containing the file split based on the + following criteria: + 1. The file split's original file id = `orig_file_id` + 2. The file split includes the message with index = `msg_ix` + :param db_conn: + :param orig_file_id: Original file id of the split + :param msg_ix: Index of the message that the file split must include + :return: A list of (archive id, file split id) on success. An empty list if + an exception occurs while interacting with the database. + """ + + query = f"""SELECT archive_id, id as file_split_id + FROM {CLP_METADATA_TABLE_PREFIX}files WHERE + orig_file_id = '{orig_file_id}' AND + begin_message_ix <= {msg_ix} AND + (begin_message_ix + num_messages) > {msg_ix} + """ + + with contextlib.closing(db_conn.cursor(dictionary=True)) as cursor: + cursor.execute(query) + results = list(cursor.fetchall()) + return results + + def get_task_group_for_job( archive_ids: List[str], task_ids: List[int], @@ -285,17 +338,35 @@ def get_task_group_for_job( results_cache_uri: str, ): job_config_obj = job.get_config().dict() - return celery.group( - search.s( - job_id=job.id, - archive_id=archive_ids[i], - task_id=task_ids[i], - job_config_obj=job_config_obj, - clp_metadata_db_conn_params=clp_metadata_db_conn_params, - results_cache_uri=results_cache_uri, + job_type = job.get_type() + if QueryJobType.SEARCH_OR_AGGREGATION == job_type: + return celery.group( + search.s( + job_id=job.id, + archive_id=archive_ids[i], + task_id=task_ids[i], + job_config_obj=job_config_obj, + clp_metadata_db_conn_params=clp_metadata_db_conn_params, + results_cache_uri=results_cache_uri, + ) + for i in range(len(archive_ids)) ) - for i in range(len(archive_ids)) - ) + elif QueryJobType.EXTRACT_IR == job_type: + return celery.group( + extract_ir.s( + job_id=job.id, + archive_id=archive_ids[i], + task_id=task_ids[i], + job_config_obj=job_config_obj, + clp_metadata_db_conn_params=clp_metadata_db_conn_params, + results_cache_uri=results_cache_uri, + ) + for i in range(len(archive_ids)) + ) + else: + error_msg = f"Unexpected job type: {job_type}" + logger.error(error_msg) + raise NotImplementedError(error_msg) def dispatch_query_job( @@ -364,6 +435,30 @@ async def acquire_reducer_for_job(job: SearchJob): logger.info(f"Got reducer for job {job.id} at {reducer_host}:{reducer_port}") +def dispatch_job_and_update_db( + db_conn, + new_job: QueryJob, + target_archives: List[str], + clp_metadata_db_conn_params: Dict[str, any], + results_cache_uri: str, + num_tasks: int, +) -> None: + dispatch_query_job( + db_conn, new_job, target_archives, clp_metadata_db_conn_params, results_cache_uri + ) + start_time = datetime.datetime.now() + new_job.start_time = start_time + set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + new_job.id, + QueryJobStatus.RUNNING, + QueryJobStatus.PENDING, + start_time=start_time, + num_tasks=num_tasks, + ) + + def handle_pending_query_jobs( db_conn_pool, clp_metadata_db_conn_params: Dict[str, any], @@ -427,6 +522,46 @@ def handle_pending_query_jobs( else: pending_search_jobs.append(new_search_job) active_jobs[job_id] = new_search_job + + elif QueryJobType.EXTRACT_IR == job_type: + extract_ir_config = ExtractIrJobConfig.parse_obj(msgpack.unpackb(job_config)) + archive_id, file_split_id = get_archive_and_file_split_ids_for_extraction( + db_conn, extract_ir_config + ) + if not archive_id or not file_split_id: + if not set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + QueryJobStatus.FAILED, + QueryJobStatus.PENDING, + start_time=datetime.datetime.now(), + num_tasks=0, + duration=0, + ): + logger.error(f"Failed to set job {job_id} as failed") + continue + + extract_ir_config.file_split_id = file_split_id + new_extract_ir_job = ExtractIrJob( + id=job_id, + archive_id=archive_id, + extract_ir_config=extract_ir_config, + state=InternalJobState.WAITING_FOR_DISPATCH, + ) + target_archive = [new_extract_ir_job.archive_id] + + dispatch_job_and_update_db( + db_conn, + new_extract_ir_job, + target_archive, + clp_metadata_db_conn_params, + results_cache_uri, + 1, + ) + active_jobs[new_extract_ir_job.id] = new_extract_ir_job + logger.info(f"Dispatched IR extraction job {job_id} on archive: {archive_id}") + else: # NOTE: We're skipping the job for this iteration, but its status will remain # unchanged. So this log will print again in the next iteration unless the user @@ -452,23 +587,17 @@ def handle_pending_query_jobs( archive_ids_for_search = [archive["archive_id"] for archive in archives_for_search] - dispatch_query_job( - db_conn, job, archive_ids_for_search, clp_metadata_db_conn_params, results_cache_uri + dispatch_job_and_update_db( + db_conn, + job, + archive_ids_for_search, + clp_metadata_db_conn_params, + results_cache_uri, + job.num_archives_to_search, ) logger.info( f"Dispatched job {job_id} with {len(archive_ids_for_search)} archives to search." ) - start_time = datetime.datetime.now() - job.start_time = start_time - set_job_or_task_status( - db_conn, - QUERY_JOBS_TABLE_NAME, - job_id, - QueryJobStatus.RUNNING, - QueryJobStatus.PENDING, - start_time=start_time, - num_tasks=job.num_archives_to_search, - ) return reducer_acquisition_tasks @@ -590,6 +719,51 @@ async def handle_finished_search_job( del active_jobs[job_id] +async def handle_finished_extract_ir_job( + db_conn, job: ExtractIrJob, task_results: Optional[Any] +) -> None: + global active_jobs + + job_id = job.id + new_job_status = QueryJobStatus.SUCCEEDED + num_tasks = len(task_results) + if 1 != num_tasks: + logger.error( + f"Unexpected number of tasks for IR extraction job {job_id}. " + f"Expected 1, got {num_tasks}." + ) + new_job_status = QueryJobStatus.FAILED + else: + task_result = QueryTaskResult.parse_obj(task_results[0]) + task_id = task_result.task_id + if not QueryJobStatus.SUCCEEDED == task_result.status: + logger.error( + f"IR extraction task job-{job_id}-task-{task_id} failed. " + f"Check {task_result.error_log_path} for details." + ) + new_job_status = QueryJobStatus.FAILED + else: + logger.info( + f"IR extraction task job-{job_id}-task-{task_id} succeeded in " + f"{task_result.duration} second(s)." + ) + + if set_job_or_task_status( + db_conn, + QUERY_JOBS_TABLE_NAME, + job_id, + new_job_status, + QueryJobStatus.RUNNING, + num_tasks_completed=num_tasks, + duration=(datetime.datetime.now() - job.start_time).total_seconds(), + ): + if new_job_status == QueryJobStatus.SUCCEEDED: + logger.info(f"Completed IR extraction job {job_id}.") + else: + logger.info(f"Completed IR extraction job {job_id} with failing tasks.") + del active_jobs[job_id] + + async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): global active_jobs @@ -627,6 +801,9 @@ async def check_job_status_and_update_db(db_conn_pool, results_cache_uri): await handle_finished_search_job( db_conn, search_job, returned_results, results_cache_uri ) + elif QueryJobType.EXTRACT_IR == job_type: + extract_ir_job: ExtractIrJob = job + await handle_finished_extract_ir_job(db_conn, extract_ir_job, returned_results) else: logger.error(f"Unexpected job type: {job_type}, skipping job {job_id}") diff --git a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py index d337e0806..3d4c0d7a7 100644 --- a/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py +++ b/components/job-orchestration/job_orchestration/scheduler/scheduler_data.py @@ -9,7 +9,11 @@ QueryJobType, QueryTaskStatus, ) -from job_orchestration.scheduler.job_config import QueryJobConfig, SearchJobConfig +from job_orchestration.scheduler.job_config import ( + ExtractIrJobConfig, + QueryJobConfig, + SearchJobConfig, +) from job_orchestration.scheduler.query.reducer_handler import ReducerHandlerMessageQueues from pydantic import BaseModel, validator @@ -53,6 +57,17 @@ def get_type(self) -> QueryJobType: ... def get_config(self) -> QueryJobConfig: ... +class ExtractIrJob(QueryJob): + extract_ir_config: ExtractIrJobConfig + archive_id: str + + def get_type(self) -> QueryJobType: + return QueryJobType.EXTRACT_IR + + def get_config(self) -> QueryJobConfig: + return self.extract_ir_config + + class SearchJob(QueryJob): search_config: SearchJobConfig num_archives_to_search: int diff --git a/components/package-template/src/etc/clp-config.yml b/components/package-template/src/etc/clp-config.yml index 740146ab9..3f658211e 100644 --- a/components/package-template/src/etc/clp-config.yml +++ b/components/package-template/src/etc/clp-config.yml @@ -46,7 +46,8 @@ #results_cache: # host: "localhost" # port: 27017 -# db_name: "clp-search" +# db_name: "clp-query-results" +# ir_collection_name: "ir-files" # #compression_worker: # logging_level: "INFO" @@ -77,6 +78,13 @@ # # How much data CLP should try to fit into each segment within an archive # target_segment_size: 268435456 # 256 MB # +## Where CLP IR files should be output +#ir_output: +# directory: "var/data/ir" +# +# # How large each IR file should be before being split into a new IR file +# target_uncompressed_size: 134217728 # 128 MB +# ## Location where other data (besides archives) are stored. It will be created if ## it doesn't exist. #data_directory: "var/data" diff --git a/components/webui/imports/api/search/constants.js b/components/webui/imports/api/search/constants.js index fbc0c3188..baedddb85 100644 --- a/components/webui/imports/api/search/constants.js +++ b/components/webui/imports/api/search/constants.js @@ -91,6 +91,7 @@ let enumQueryType; */ const QUERY_JOB_TYPE = Object.freeze({ SEARCH_OR_AGGREGATION: (enumQueryType = 0), + EXTRACT_IR: ++enumQueryType, }); /* eslint-enable sort-keys */ diff --git a/docs/src/dev-guide/components-webui.md b/docs/src/dev-guide/components-webui.md index bf2f76f6f..c5a482e12 100644 --- a/docs/src/dev-guide/components-webui.md +++ b/docs/src/dev-guide/components-webui.md @@ -41,7 +41,7 @@ package: ```shell # Please update `` accordingly. - MONGO_URL="mongodb://localhost:27017/clp-search" \ + MONGO_URL="mongodb://localhost:27017/clp-query-results" \ ROOT_URL="http://localhost:4000" \ CLP_DB_USER="clp-user" \ CLP_DB_PASS="" \