From f05dc889237cc632f72865a13d3a111a09722aae Mon Sep 17 00:00:00 2001 From: Haiqi Xu <14502009+haiqi96@users.noreply.github.com> Date: Thu, 12 Dec 2024 17:53:54 -0500 Subject: [PATCH] backup changes for worker config --- .../clp_package_utils/general.py | 11 ++++ .../clp_package_utils/scripts/start_clp.py | 44 ++++----------- .../clp-py-utils/clp_py_utils/clp_config.py | 17 ++++++ .../executor/compress/fs_compression_task.py | 55 ++++++++++--------- 4 files changed, 68 insertions(+), 59 deletions(-) diff --git a/components/clp-package-utils/clp_package_utils/general.py b/components/clp-package-utils/clp_package_utils/general.py index 05a67d497..507e1e434 100644 --- a/components/clp-package-utils/clp_package_utils/general.py +++ b/components/clp-package-utils/clp_package_utils/general.py @@ -14,6 +14,7 @@ from clp_py_utils.clp_config import ( CLP_DEFAULT_CREDENTIALS_FILE_PATH, CLPConfig, + WorkerConfig, DB_COMPONENT_NAME, LOG_VIEWER_WEBUI_COMPONENT_NAME, QUEUE_COMPONENT_NAME, @@ -270,6 +271,16 @@ def generate_container_config( return container_clp_config, docker_mounts +def generate_worker_config(clp_config: CLPConfig) -> WorkerConfig: + worker_config = WorkerConfig() + worker_config.archive_output = clp_config.archive_output.copy(deep=True) + worker_config.stream_output = clp_config.stream_output.copy(deep=True) + worker_config.package = clp_config.package.copy(deep=True) + worker_config.data_directory = clp_config.data_directory + + return worker_config + + def dump_container_config( container_clp_config: CLPConfig, clp_config: CLPConfig, container_name: str ) -> Tuple[pathlib.Path, pathlib.Path]: diff --git a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py index 8351a8836..f94fefb0b 100755 --- a/components/clp-package-utils/clp_package_utils/scripts/start_clp.py +++ b/components/clp-package-utils/clp_package_utils/scripts/start_clp.py @@ -42,6 +42,7 @@ DockerMount, DockerMountType, generate_container_config, + generate_worker_config, get_clp_home, is_container_exited, is_container_running, @@ -638,7 +639,6 @@ def start_compression_worker( num_cpus, mounts, None, - None, ) @@ -653,10 +653,6 @@ def start_query_worker( celery_route = f"{QueueName.QUERY}" query_worker_mount = [mounts.stream_output_dir] - query_worker_env = { - "CLP_STREAM_OUTPUT_DIR": container_clp_config.stream_output.directory, - "CLP_STREAM_COLLECTION_NAME": clp_config.results_cache.stream_collection_name, - } generic_start_worker( QUERY_WORKER_COMPONENT_NAME, @@ -669,7 +665,6 @@ def start_query_worker( clp_config.redis.query_backend_database, num_cpus, mounts, - query_worker_env, query_worker_mount, ) @@ -685,8 +680,7 @@ def generic_start_worker( redis_database: int, num_cpus: int, mounts: CLPDockerMounts, - worker_specific_env: Dict[str, Any], - worker_specific_mount: List[Optional[DockerMount]], + worker_specific_mount: Optional[List[Optional[DockerMount]]], ): logger.info(f"Starting {component_name}...") @@ -694,6 +688,12 @@ def generic_start_worker( if container_exists(container_name): return + container_config_filename = f"{container_name}.yml" + container_config_file_path = clp_config.logs_directory / container_config_filename + container_worker_config = generate_worker_config(container_clp_config) + with open(container_config_file_path, "w") as f: + yaml.safe_dump(container_worker_config.dump_to_primitive_dict(), f) + logs_dir = clp_config.logs_directory / component_name logs_dir.mkdir(parents=True, exist_ok=True) container_logs_dir = container_clp_config.logs_directory / component_name @@ -723,39 +723,19 @@ def generic_start_worker( f"{container_clp_config.redis.host}:{container_clp_config.redis.port}/{redis_database}" ), "-e", f"CLP_HOME={CONTAINER_CLP_HOME}", - "-e", f"CLP_DATA_DIR={container_clp_config.data_directory}", + "-e", f"WORKER_CONFIG_PATH={container_clp_config.logs_directory / container_config_filename}", + # Follow the other method. "-e", f"CLP_LOGS_DIR={container_logs_dir}", "-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}", - "-e", f"CLP_STORAGE_ENGINE={clp_config.package.storage_engine}", - # need a way to remove this maybe - "-e", f"CLP_ARCHIVE_OUTPUT_DIR={container_clp_config.archive_output.get_directory()}", ] - if worker_specific_env: - for env_name, env_value in worker_specific_env.items(): - container_start_cmd.append("-e") - container_start_cmd.append(f"{env_name}={env_value}") - - if "s3" == clp_config.archive_output.storage.type: - s3_config = clp_config.archive_output.storage.s3_config - container_start_cmd += [ - "-e", f"ENABLE_S3_ARCHIVE=1", - "-e", f"REGION_NAME={s3_config.region_name}", - "-e", f"BUCKET={s3_config.bucket}", - "-e", f"KEY_PREFIX={s3_config.key_prefix}" - ] - if s3_config.secret_access_key is not None and s3_config.secret_access_key is not None: - container_start_cmd += [ - "-e", f"ACCESS_KEY_ID={s3_config.access_key_id}", - "-e", f"SECRET_ACCESS_KEY={s3_config.secret_access_key}" - ] # fmt: on necessary_mounts = [ mounts.clp_home, mounts.data_dir, mounts.logs_dir, - # need a way to remove this maybe, since reader doesn't need it if it is staged. - # one option is to move it to the worker_specific_mount + # TODO: need a way to remove this maybe, since reader doesn't need it if it + # is staged. one option is to move it to the worker_specific_mount mounts.archives_output_dir, mounts.input_logs_dir, ] diff --git a/components/clp-py-utils/clp_py_utils/clp_config.py b/components/clp-py-utils/clp_py_utils/clp_config.py index 3694c8312..e33c29edf 100644 --- a/components/clp-py-utils/clp_py_utils/clp_config.py +++ b/components/clp-py-utils/clp_py_utils/clp_config.py @@ -633,3 +633,20 @@ def dump_to_primitive_dict(self): d["data_directory"] = str(self.data_directory) d["logs_directory"] = str(self.logs_directory) return d + + +class WorkerConfig(BaseModel): + package: Package = Package() + archive_output: ArchiveOutput = ArchiveOutput() + # Only needed by query worker. Consider inheriting at some point. + stream_output: StreamOutput = StreamOutput() + + data_directory: pathlib.Path = pathlib.Path("var") / "data" + + def dump_to_primitive_dict(self): + d = self.dict() + d["archive_output"]["storage"] = self.archive_output.storage.dump_to_primitive_dict() + d["stream_output"] = self.stream_output.dump_to_primitive_dict() + # Turn paths into primitive strings + d["data_directory"] = str(self.data_directory) + return d \ No newline at end of file diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index 120f9179a..66a88ad18 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -15,8 +15,11 @@ Database, S3Config, StorageEngine, + StorageType, + WorkerConfig ) from clp_py_utils.clp_logging import set_logging_level +from clp_py_utils.core import read_yaml_config_file from clp_py_utils.result import Result from clp_py_utils.s3_utils import s3_put from clp_py_utils.sql_adapter import SQL_Adapter @@ -24,6 +27,7 @@ from job_orchestration.scheduler.constants import CompressionTaskStatus from job_orchestration.scheduler.job_config import ClpIoConfig, PathsToCompress from job_orchestration.scheduler.scheduler_data import CompressionTaskResult +from pydantic import ValidationError # Setup logging logger = get_task_logger(__name__) @@ -94,23 +98,6 @@ def upload_single_file_archive_to_s3( return Result(success=True) -def get_s3_config() -> Optional[S3Config]: - enable_s3_write = os.getenv("ENABLE_S3_ARCHIVE") - if enable_s3_write is None: - return None - - # TODO: this method is very errorprone since it doesn't check individual members - # Let's leave this for now before we properly implement the config file. - s3_config = S3Config( - region_name=os.getenv("REGION_NAME"), - bucket=os.getenv("BUCKET"), - key_prefix=os.getenv("KEY_PREFIX"), - access_key_id=os.getenv("ACCESS_KEY_ID"), - secret_access_key=os.getenv("SECRET_ACCESS_KEY"), - ) - return s3_config - - def make_clp_command( clp_home: pathlib.Path, archive_output_dir: pathlib.Path, @@ -169,6 +156,7 @@ def make_clp_s_command( def run_clp( + worker_config: WorkerConfig, clp_config: ClpIoConfig, clp_home: pathlib.Path, data_dir: pathlib.Path, @@ -184,6 +172,7 @@ def run_clp( """ Compresses files from an FS into archives on an FS + :param worker_config: WorkerConfig :param clp_config: ClpIoConfig :param clp_home: :param data_dir: @@ -197,7 +186,7 @@ def run_clp( :param clp_metadata_db_connection_config :return: tuple -- (whether compression was successful, output messages) """ - clp_storage_engine = str(os.getenv("CLP_STORAGE_ENGINE")) + clp_storage_engine = worker_config.package.storage_engine instance_id_str = f"compression-job-{job_id}-task-{task_id}" @@ -208,7 +197,8 @@ def run_clp( db_config_file.close() # Get s3 config - s3_config = get_s3_config() + storage_config = worker_config.archive_output.storage + s3_config = storage_config.s3_config if StorageType.S3 == storage_config.type else None s3_upload_failed = False if StorageEngine.CLP == clp_storage_engine: @@ -338,10 +328,20 @@ def compress( paths_to_compress_json: str, clp_metadata_db_connection_config, ): - clp_home_str = os.getenv("CLP_HOME") - data_dir_str = os.getenv("CLP_DATA_DIR") - archive_output_dir_str = os.getenv("CLP_ARCHIVE_OUTPUT_DIR") - logs_dir_str = os.getenv("CLP_LOGS_DIR") + clp_home = pathlib.Path(os.getenv("CLP_HOME")) + logs_dir = pathlib.Path(os.getenv("CLP_LOGS_DIR")) + + # Load configuration + worker_config_path = pathlib.Path(os.getenv("WORKER_CONFIG_PATH")) + try: + worker_config = WorkerConfig.parse_obj(read_yaml_config_file(worker_config_path)) + except ValidationError as err: + logger.error(err) + return -1 + except Exception as ex: + logger.error(ex) + return -1 + # Set logging level clp_logging_level = str(os.getenv("CLP_LOGGING_LEVEL")) @@ -355,11 +355,12 @@ def compress( start_time = datetime.datetime.now() logger.info(f"[job_id={job_id} task_id={task_id}] COMPRESSION STARTED.") compression_task_status, worker_output = run_clp( + worker_config, clp_io_config, - pathlib.Path(clp_home_str), - pathlib.Path(data_dir_str), - pathlib.Path(archive_output_dir_str), - pathlib.Path(logs_dir_str), + clp_home, + worker_config.data_directory, + worker_config.archive_output.get_directory(), + logs_dir, job_id, task_id, tag_ids,