Skip to content

Commit

Permalink
clp-package: Add handling for IR extraction jobs to the query schedul…
Browse files Browse the repository at this point in the history
…er and workers. (y-scope#460)
  • Loading branch information
haiqi96 authored Jun 28, 2024
1 parent 249816b commit 9ba0451
Show file tree
Hide file tree
Showing 14 changed files with 625 additions and 166 deletions.
15 changes: 15 additions & 0 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, clp_home: pathlib.Path, docker_clp_home: pathlib.Path):
self.data_dir: typing.Optional[DockerMount] = None
self.logs_dir: typing.Optional[DockerMount] = None
self.archives_output_dir: typing.Optional[DockerMount] = None
self.ir_output_dir: typing.Optional[DockerMount] = None


def get_clp_home():
Expand Down Expand Up @@ -224,6 +225,19 @@ def generate_container_config(clp_config: CLPConfig, clp_home: pathlib.Path):
container_clp_config.archive_output.directory,
)

container_clp_config.ir_output.directory = pathlib.Path("/") / "mnt" / "ir-output"
if not is_path_already_mounted(
clp_home,
CONTAINER_CLP_HOME,
clp_config.ir_output.directory,
container_clp_config.ir_output.directory,
):
docker_mounts.ir_output_dir = DockerMount(
DockerMountType.BIND,
clp_config.ir_output.directory,
container_clp_config.ir_output.directory,
)

return container_clp_config, docker_mounts


Expand Down Expand Up @@ -391,6 +405,7 @@ def validate_results_cache_config(
def validate_worker_config(clp_config: CLPConfig):
clp_config.validate_input_logs_dir()
clp_config.validate_archive_output_dir()
clp_config.validate_ir_output_dir()


def validate_webui_config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import subprocess
import sys
import time
import typing
import uuid
from typing import Any, Dict, List, Optional

import yaml
from clp_py_utils.clp_config import (
Expand Down Expand Up @@ -526,6 +526,8 @@ def start_compression_worker(
clp_config.redis.compression_backend_database,
num_cpus,
mounts,
None,
None,
)


Expand All @@ -538,6 +540,13 @@ def start_query_worker(
):
celery_method = "job_orchestration.executor.query"
celery_route = f"{QueueName.QUERY}"

query_worker_mount = [mounts.ir_output_dir]
query_worker_env = {
"CLP_IR_OUTPUT_DIR": container_clp_config.ir_output.directory,
"CLP_IR_COLLECTION": clp_config.results_cache.ir_collection_name,
}

generic_start_worker(
QUERY_WORKER_COMPONENT_NAME,
instance_id,
Expand All @@ -549,6 +558,8 @@ def start_query_worker(
clp_config.redis.query_backend_database,
num_cpus,
mounts,
query_worker_env,
query_worker_mount,
)


Expand All @@ -563,6 +574,8 @@ def generic_start_worker(
redis_database: int,
num_cpus: int,
mounts: CLPDockerMounts,
worker_specific_env: Dict[str, Any],
worker_specific_mount: List[Optional[DockerMount]],
):
logger.info(f"Starting {component_name}...")

Expand All @@ -578,6 +591,7 @@ def generic_start_worker(

# Create necessary directories
clp_config.archive_output.directory.mkdir(parents=True, exist_ok=True)
clp_config.ir_output.directory.mkdir(parents=True, exist_ok=True)

clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
# fmt: off
Expand Down Expand Up @@ -605,19 +619,28 @@ def generic_start_worker(
"-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}",
"-e", f"CLP_STORAGE_ENGINE={clp_config.package.storage_engine}",
"-u", f"{os.getuid()}:{os.getgid()}",
"--mount", str(mounts.clp_home),
]
if worker_specific_env:
for env_name, env_value in worker_specific_env.items():
container_start_cmd.append("-e")
container_start_cmd.append(f"{env_name}={env_value}")

# fmt: on
necessary_mounts = [
mounts.clp_home,
mounts.data_dir,
mounts.logs_dir,
mounts.archives_output_dir,
mounts.input_logs_dir,
]
if worker_specific_mount:
necessary_mounts.extend(worker_specific_mount)

for mount in necessary_mounts:
if mount:
container_start_cmd.append("--mount")
container_start_cmd.append(str(mount))
if not mount:
raise ValueError(f"Required mount configuration is empty: {necessary_mounts}")
container_start_cmd.append("--mount")
container_start_cmd.append(str(mount))
container_start_cmd.append(clp_config.execution_container)

worker_cmd = [
Expand Down Expand Up @@ -645,8 +668,8 @@ def generic_start_worker(

def update_meteor_settings(
parent_key_prefix: str,
settings: typing.Dict[str, typing.Any],
updates: typing.Dict[str, typing.Any],
settings: Dict[str, Any],
updates: Dict[str, Any],
):
"""
Recursively updates the given Meteor settings object with the values from `updates`.
Expand Down
44 changes: 43 additions & 1 deletion components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ def validate_upsert_interval(cls, field):
class ResultsCache(BaseModel):
host: str = "localhost"
port: int = 27017
db_name: str = "clp-search"
db_name: str = "clp-query-results"
ir_collection_name: str = "ir-files"

@validator("host")
def validate_host(cls, field):
Expand All @@ -268,6 +269,12 @@ def validate_db_name(cls, field):
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME}.db_name cannot be empty.")
return field

@validator("ir_collection_name")
def validate_ir_collection_name(cls, field):
if "" == field:
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME}.ir_collection_name cannot be empty.")
return field

def get_uri(self):
return f"mongodb://{self.host}:{self.port}/{self.db_name}"

Expand Down Expand Up @@ -321,6 +328,32 @@ def dump_to_primitive_dict(self):
return d


class IrOutput(BaseModel):
directory: pathlib.Path = pathlib.Path("var") / "data" / "ir"
target_uncompressed_size: int = 128 * 1024 * 1024

@validator("directory")
def validate_directory(cls, field):
if "" == field:
raise ValueError("directory can not be empty")
return field

@validator("target_uncompressed_size")
def validate_target_uncompressed_size(cls, field):
if field <= 0:
raise ValueError("target_uncompressed_size must be greater than 0")
return field

def make_config_paths_absolute(self, clp_home: pathlib.Path):
self.directory = make_config_path_absolute(clp_home, self.directory)

def dump_to_primitive_dict(self):
d = self.dict()
# Turn directory (pathlib.Path) into a primitive string
d["directory"] = str(d["directory"])
return d


class WebUi(BaseModel):
host: str = "localhost"
port: int = 4000
Expand Down Expand Up @@ -368,6 +401,7 @@ class CLPConfig(BaseModel):
credentials_file_path: pathlib.Path = CLP_DEFAULT_CREDENTIALS_FILE_PATH

archive_output: ArchiveOutput = ArchiveOutput()
ir_output: IrOutput = IrOutput()
data_directory: pathlib.Path = pathlib.Path("var") / "data"
logs_directory: pathlib.Path = pathlib.Path("var") / "log"

Expand All @@ -377,6 +411,7 @@ def make_config_paths_absolute(self, clp_home: pathlib.Path):
self.input_logs_directory = make_config_path_absolute(clp_home, self.input_logs_directory)
self.credentials_file_path = make_config_path_absolute(clp_home, self.credentials_file_path)
self.archive_output.make_config_paths_absolute(clp_home)
self.ir_output.make_config_paths_absolute(clp_home)
self.data_directory = make_config_path_absolute(clp_home, self.data_directory)
self.logs_directory = make_config_path_absolute(clp_home, self.logs_directory)
self._os_release_file_path = make_config_path_absolute(clp_home, self._os_release_file_path)
Expand All @@ -396,6 +431,12 @@ def validate_archive_output_dir(self):
except ValueError as ex:
raise ValueError(f"archive_output.directory is invalid: {ex}")

def validate_ir_output_dir(self):
try:
validate_path_could_be_dir(self.ir_output.directory)
except ValueError as ex:
raise ValueError(f"ir_output.directory is invalid: {ex}")

def validate_data_dir(self):
try:
validate_path_could_be_dir(self.data_directory)
Expand Down Expand Up @@ -463,6 +504,7 @@ def load_redis_credentials_from_file(self):
def dump_to_primitive_dict(self):
d = self.dict()
d["archive_output"] = self.archive_output.dump_to_primitive_dict()
d["ir_output"] = self.ir_output.dump_to_primitive_dict()
# Turn paths into primitive strings
d["input_logs_directory"] = str(self.input_logs_directory)
d["credentials_file_path"] = str(self.credentials_file_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

from job_orchestration.scheduler.constants import QueueName

imports = "job_orchestration.executor.query.fs_search_task"
imports = (
"job_orchestration.executor.query.fs_search_task",
"job_orchestration.executor.query.extract_ir_task",
)

task_routes = {
"job_orchestration.executor.query.fs_search_task.search": QueueName.QUERY,
"job_orchestration.executor.query.extract_ir_task.extract_ir": QueueName.QUERY,
}
task_create_missing_queues = True

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import datetime
import os
from pathlib import Path
from typing import Any, Dict, List, Optional

from celery.app.task import Task
from celery.utils.log import get_task_logger
from clp_py_utils.clp_config import Database, StorageEngine
from clp_py_utils.clp_logging import set_logging_level
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.executor.query.celery import app
from job_orchestration.executor.query.utils import (
report_command_creation_failure,
run_query_task,
)
from job_orchestration.scheduler.job_config import ExtractIrJobConfig
from job_orchestration.scheduler.scheduler_data import QueryTaskStatus

# Setup logging
logger = get_task_logger(__name__)


def make_command(
storage_engine: str,
clp_home: Path,
archives_dir: Path,
archive_id: str,
ir_output_dir: Path,
extract_ir_config: ExtractIrJobConfig,
results_cache_uri: str,
ir_collection: str,
) -> Optional[List[str]]:
if StorageEngine.CLP == storage_engine:
if not extract_ir_config.file_split_id:
logger.error("file_split_id not supplied")
return None
command = [
str(clp_home / "bin" / "clo"),
"i",
str(archives_dir / archive_id),
extract_ir_config.file_split_id,
str(ir_output_dir),
results_cache_uri,
ir_collection,
]
if extract_ir_config.target_uncompressed_size is not None:
command.append("--target-size")
command.append(extract_ir_config.target_uncompressed_size)
else:
logger.error(f"Unsupported storage engine {storage_engine}")
return None

return command


@app.task(bind=True)
def extract_ir(
self: Task,
job_id: str,
task_id: int,
job_config_obj: dict,
archive_id: str,
clp_metadata_db_conn_params: dict,
results_cache_uri: str,
) -> Dict[str, Any]:
task_name = "IR extraction"

# Setup logging to file
clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR"))
clp_logging_level = os.getenv("CLP_LOGGING_LEVEL")
set_logging_level(logger, clp_logging_level)

logger.info(f"Started {task_name} task for job {job_id}")

start_time = datetime.datetime.now()
task_status: QueryTaskStatus
sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params))

# Make task_command
clp_home = Path(os.getenv("CLP_HOME"))
archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR"))
clp_storage_engine = os.getenv("CLP_STORAGE_ENGINE")
ir_output_dir = Path(os.getenv("CLP_IR_OUTPUT_DIR"))
ir_collection = os.getenv("CLP_IR_COLLECTION")
extract_ir_config = ExtractIrJobConfig.parse_obj(job_config_obj)

task_command = make_command(
storage_engine=clp_storage_engine,
clp_home=clp_home,
archives_dir=archive_directory,
archive_id=archive_id,
ir_output_dir=ir_output_dir,
extract_ir_config=extract_ir_config,
results_cache_uri=results_cache_uri,
ir_collection=ir_collection,
)
if not task_command:
return report_command_creation_failure(
sql_adapter=sql_adapter,
logger=logger,
task_name=task_name,
task_id=task_id,
start_time=start_time,
)

return run_query_task(
sql_adapter=sql_adapter,
logger=logger,
clp_logs_dir=clp_logs_dir,
task_command=task_command,
task_name=task_name,
job_id=job_id,
task_id=task_id,
start_time=start_time,
)
Loading

0 comments on commit 9ba0451

Please sign in to comment.