diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 507e1e434..81bc238aa 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -276,6 +276,7 @@ def generate_worker_config(clp_config: CLPConfig) -> WorkerConfig: worker_config.archive_output = clp_config.archive_output.copy(deep=True) worker_config.stream_output = clp_config.stream_output.copy(deep=True) worker_config.package = clp_config.package.copy(deep=True) + worker_config.results_cache = clp_config.results_cache.copy(deep=True) worker_config.data_directory = clp_config.data_directory return worker_config diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index e33c29edf..a4ca08a7d 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -638,11 +638,12 @@ def dump_to_primitive_dict(self): class WorkerConfig(BaseModel): package: Package = Package() archive_output: ArchiveOutput = ArchiveOutput() - # Only needed by query worker. Consider inheriting at some point. - stream_output: StreamOutput = StreamOutput() - data_directory: pathlib.Path = pathlib.Path("var") / "data" + # Only needed by query worker. Consider inheriting at some point. + stream_output = StreamOutput() + results_cache: ResultsCache = ResultsCache() + def dump_to_primitive_dict(self): d = self.dict() d["archive_output"]["storage"] = self.archive_output.storage.dump_to_primitive_dict() diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index 66a88ad18..244b77ebe 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -135,12 +135,12 @@ def make_clp_s_command( archive_output_dir: pathlib.Path, clp_config: ClpIoConfig, db_config_file_path: pathlib.Path, + enable_s3_write: bool, ): # fmt: off compression_cmd = [ str(clp_home / "bin" / "clp-s"), "c", str(archive_output_dir), - "--single-file-archive", "--print-archive-stats", "--target-encoded-size", str(clp_config.output.target_segment_size + clp_config.output.target_dictionaries_size), @@ -148,6 +148,9 @@ def make_clp_s_command( ] # fmt: on + if enable_s3_write: + compression_cmd.append("--single-file-archive") + if clp_config.input.timestamp_key is not None: compression_cmd.append("--timestamp-key") compression_cmd.append(clp_config.input.timestamp_key) @@ -159,8 +162,6 @@ def run_clp( worker_config: WorkerConfig, clp_config: ClpIoConfig, clp_home: pathlib.Path, - data_dir: pathlib.Path, - archive_output_dir: pathlib.Path, logs_dir: pathlib.Path, job_id: int, task_id: int, @@ -175,8 +176,6 @@ def run_clp( :param worker_config: WorkerConfig :param clp_config: ClpIoConfig :param clp_home: - :param data_dir: - :param archive_output_dir: :param logs_dir: :param job_id: :param task_id: @@ -186,10 +185,12 @@ def run_clp( :param clp_metadata_db_connection_config :return: tuple -- (whether compression was successful, output messages) """ - clp_storage_engine = worker_config.package.storage_engine - instance_id_str = f"compression-job-{job_id}-task-{task_id}" + clp_storage_engine = worker_config.package.storage_engine + data_dir = worker_config.data_directory + archive_output_dir = worker_config.archive_output.get_directory() + # Generate database config file for clp db_config_file_path = data_dir / f"{instance_id_str}-db-config.yml" db_config_file = open(db_config_file_path, "w") @@ -197,9 +198,18 @@ def run_clp( db_config_file.close() # Get s3 config - storage_config = worker_config.archive_output.storage - s3_config = storage_config.s3_config if StorageType.S3 == storage_config.type else None - s3_upload_failed = False + s3_config = None + enable_s3_write = False + s3_write_failed = False + storage_type = worker_config.archive_output.storage.type + if StorageType.S3 == storage_type: + # This should be caught by start-clp and could be redundant, but let's be safe for now. + if StorageEngine.CLP == clp_storage_engine: + logger.error(f"S3 is not supported for {clp_storage_engine}") + return False, {"error_message": f"S3 is not supported for {clp_storage_engine}"} + + s3_config = worker_config.archive_output.storage.s3_config + enable_s3_write = True if StorageEngine.CLP == clp_storage_engine: compression_cmd = make_clp_command( @@ -214,6 +224,7 @@ def run_clp( archive_output_dir=archive_output_dir, clp_config=clp_config, db_config_file_path=db_config_file_path, + enable_s3_write=enable_s3_write ) else: logger.error(f"Unsupported storage engine {clp_storage_engine}") @@ -264,13 +275,13 @@ def run_clp( if last_archive_stats is not None and ( None is stats or stats["id"] != last_archive_stats["id"] ): - if s3_config is not None: + if enable_s3_write: result = upload_single_file_archive_to_s3( last_archive_stats, archive_output_dir, s3_config ) if not result.success: worker_output["error_message"] = result.error - s3_upload_failed = True + s3_write_failed = True # Upon failure, skip updating the archive tags and job metadata. break @@ -308,7 +319,7 @@ def run_clp( # Close stderr log file stderr_log_file.close() - if s3_upload_failed: + if s3_write_failed: logger.error(f"Failed to upload to S3.") return CompressionTaskStatus.FAILED, worker_output if compression_successful: @@ -329,23 +340,24 @@ def compress( clp_metadata_db_connection_config, ): clp_home = pathlib.Path(os.getenv("CLP_HOME")) + + # Set logging level logs_dir = pathlib.Path(os.getenv("CLP_LOGS_DIR")) + clp_logging_level = str(os.getenv("CLP_LOGGING_LEVEL")) + set_logging_level(logger, clp_logging_level) # Load configuration - worker_config_path = pathlib.Path(os.getenv("WORKER_CONFIG_PATH")) try: - worker_config = WorkerConfig.parse_obj(read_yaml_config_file(worker_config_path)) - except ValidationError as err: - logger.error(err) - return -1 + worker_config = WorkerConfig.parse_obj(read_yaml_config_file(pathlib.Path(os.getenv("WORKER_CONFIG_PATH")))) except Exception as ex: - logger.error(ex) - return -1 - - - # Set logging level - clp_logging_level = str(os.getenv("CLP_LOGGING_LEVEL")) - set_logging_level(logger, clp_logging_level) + error_msg = "Failed to load worker config" + logger.exception(error_msg) + return CompressionTaskResult( + task_id=task_id, + status=CompressionTaskStatus.FAILED, + duration=0, + error_message=error_msg + ) clp_io_config = ClpIoConfig.parse_raw(clp_io_config_json) paths_to_compress = PathsToCompress.parse_raw(paths_to_compress_json) @@ -358,8 +370,6 @@ def compress( worker_config, clp_io_config, clp_home, - worker_config.data_directory, - worker_config.archive_output.get_directory(), logs_dir, job_id, task_id, diff --git a/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py b/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py index 423ebb757..10c98b457 100644 --- a/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/extract_stream_task.py @@ -5,12 +5,13 @@ from celery.app.task import Task from celery.utils.log import get_task_logger -from clp_py_utils.clp_config import Database, StorageEngine +from clp_py_utils.clp_config import Database, StorageEngine, StorageType, WorkerConfig from clp_py_utils.clp_logging import set_logging_level from clp_py_utils.sql_adapter import SQL_Adapter +from job_orchestration.executor.utils import try_load_worker_config from job_orchestration.executor.query.celery import app from job_orchestration.executor.query.utils import ( - report_command_creation_failure, + report_task_failure, run_query_task, ) from job_orchestration.scheduler.job_config import ExtractIrJobConfig, ExtractJsonJobConfig @@ -21,15 +22,17 @@ def make_command( - storage_engine: str, clp_home: Path, - archives_dir: Path, + worker_config: WorkerConfig, archive_id: str, - stream_output_dir: Path, job_config: dict, results_cache_uri: str, - stream_collection_name: str, ) -> Optional[List[str]]: + storage_engine = worker_config.package.storage_engine + archives_dir = worker_config.archive_output.get_directory() + stream_output_dir = worker_config.stream_output.directory + stream_collection_name = worker_config.results_cache.stream_collection_name + if StorageEngine.CLP == storage_engine: logger.info("Starting IR extraction") extract_ir_config = ExtractIrJobConfig.parse_obj(job_config) @@ -97,28 +100,37 @@ def extract_stream( task_status: QueryTaskStatus sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params)) + # Load configuration + worker_config = try_load_worker_config(os.getenv("WORKER_CONFIG_PATH"), logger) + if worker_config is None: + return report_task_failure( + sql_adapter=sql_adapter, + task_id=task_id, + start_time=start_time, + ) + + if worker_config.archive_output.storage.type == StorageType.S3: + logger.error(f"Extraction is not supported for S3 storage type") + return report_task_failure( + sql_adapter=sql_adapter, + task_id=task_id, + start_time=start_time, + ) + # Make task_command clp_home = Path(os.getenv("CLP_HOME")) - archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR")) - clp_storage_engine = os.getenv("CLP_STORAGE_ENGINE") - stream_output_dir = Path(os.getenv("CLP_STREAM_OUTPUT_DIR")) - stream_collection_name = os.getenv("CLP_STREAM_COLLECTION_NAME") task_command = make_command( - storage_engine=clp_storage_engine, clp_home=clp_home, - archives_dir=archive_directory, + worker_config=worker_config, archive_id=archive_id, - stream_output_dir=stream_output_dir, job_config=job_config, results_cache_uri=results_cache_uri, - stream_collection_name=stream_collection_name, ) if not task_command: - return report_command_creation_failure( + logger.error(f"Error creating {task_name} command") + return report_task_failure( sql_adapter=sql_adapter, - logger=logger, - task_name=task_name, task_id=task_id, start_time=start_time, ) diff --git a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py index 598bfdcfc..6f0337b08 100644 --- a/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py +++ b/components/job-orchestration/job_orchestration/executor/query/fs_search_task.py @@ -5,14 +5,15 @@ from celery.app.task import Task from celery.utils.log import get_task_logger -from clp_py_utils.clp_config import Database, StorageEngine +from clp_py_utils.clp_config import Database, StorageEngine, StorageType, WorkerConfig from clp_py_utils.clp_logging import set_logging_level from clp_py_utils.sql_adapter import SQL_Adapter from job_orchestration.executor.query.celery import app from job_orchestration.executor.query.utils import ( - report_command_creation_failure, + report_task_failure, run_query_task, ) +from job_orchestration.executor.utils import try_load_worker_config from job_orchestration.scheduler.job_config import SearchJobConfig from job_orchestration.scheduler.scheduler_data import QueryTaskStatus @@ -21,14 +22,16 @@ def make_command( - storage_engine: str, clp_home: Path, - archives_dir: Path, + worker_config: WorkerConfig, archive_id: str, search_config: SearchJobConfig, results_cache_uri: str, results_collection: str, ) -> Optional[List[str]]: + storage_engine = worker_config.package.storage_engine + archives_dir = worker_config.archive_output.get_directory() + if StorageEngine.CLP == storage_engine: command = [str(clp_home / "bin" / "clo"), "s", str(archives_dir / archive_id)] if search_config.path_filter is not None: @@ -116,26 +119,39 @@ def search( task_status: QueryTaskStatus sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params)) + # Load configuration + worker_config = try_load_worker_config(os.getenv("WORKER_CONFIG_PATH"), logger) + if worker_config is None: + return report_task_failure( + sql_adapter=sql_adapter, + task_id=task_id, + start_time=start_time, + ) + + if worker_config.archive_output.storage.type == StorageType.S3: + logger.error(f"Search is not supported for S3 storage type") + return report_task_failure( + sql_adapter=sql_adapter, + task_id=task_id, + start_time=start_time, + ) + # Make task_command clp_home = Path(os.getenv("CLP_HOME")) - archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR")) - clp_storage_engine = os.getenv("CLP_STORAGE_ENGINE") search_config = SearchJobConfig.parse_obj(job_config) task_command = make_command( - storage_engine=clp_storage_engine, clp_home=clp_home, - archives_dir=archive_directory, + worker_config=worker_config, archive_id=archive_id, search_config=search_config, results_cache_uri=results_cache_uri, results_collection=job_id, ) if not task_command: - return report_command_creation_failure( + logger.error(f"Error creating {task_name} command") + return report_task_failure( sql_adapter=sql_adapter, - logger=logger, - task_name=task_name, task_id=task_id, start_time=start_time, ) diff --git a/components/job-orchestration/job_orchestration/executor/query/utils.py b/components/job-orchestration/job_orchestration/executor/query/utils.py index 69d22398e..523abbe00 100644 --- a/components/job-orchestration/job_orchestration/executor/query/utils.py +++ b/components/job-orchestration/job_orchestration/executor/query/utils.py @@ -19,14 +19,11 @@ def get_task_log_file_path(clp_logs_dir: Path, job_id: str, task_id: int) -> Pat return worker_logs_dir / f"{task_id}-clo.log" -def report_command_creation_failure( +def report_task_failure( sql_adapter: SQL_Adapter, - logger: Logger, - task_name: str, task_id: int, start_time: datetime.datetime, ): - logger.error(f"Error creating {task_name} command") task_status = QueryTaskStatus.FAILED update_query_task_metadata( sql_adapter, diff --git a/components/job-orchestration/job_orchestration/executor/utils.py b/components/job-orchestration/job_orchestration/executor/utils.py new file mode 100644 index 000000000..fd5cc27ed --- /dev/null +++ b/components/job-orchestration/job_orchestration/executor/utils.py @@ -0,0 +1,18 @@ +from typing import Optional +from logging import Logger +from clp_py_utils.clp_config import WorkerConfig +from clp_py_utils.core import read_yaml_config_file +from pathlib import Path +def try_load_worker_config( + config_path_str: str, + logger: Logger, +) -> Optional[WorkerConfig]: + if config_path_str is None: + logger.error("config_path_str can't be empty") + return None + + try: + return WorkerConfig.parse_obj(read_yaml_config_file(Path(config_path_str))) + except Exception: + logger.exception("Failed to load worker config") + return None \ No newline at end of file