Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clp-package: Add handling for IR extraction jobs to the query scheduler and workers. #460

Merged
merged 30 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
decfb0f
Add job types
haiqi96 Jun 18, 2024
e52877c
Add base class for SearchQuery
haiqi96 Jun 19, 2024
d28681c
Fix
haiqi96 Jun 19, 2024
d027279
Linter
haiqi96 Jun 19, 2024
92e69fd
fixes
haiqi96 Jun 19, 2024
f0ee686
fixes
haiqi96 Jun 19, 2024
14100c2
Initial support for IR task flow in the scheduler
haiqi96 Jun 19, 2024
133481f
Some renaming and remove debug print
haiqi96 Jun 19, 2024
7b1dfd5
First draft
haiqi96 Jun 21, 2024
afa26f9
First Refactoring
haiqi96 Jun 21, 2024
e2cdcb1
Refactoring
haiqi96 Jun 21, 2024
46431cf
refactor log messages and names
haiqi96 Jun 21, 2024
43dede4
remove unused imports
haiqi96 Jun 21, 2024
57492bb
fix
haiqi96 Jun 21, 2024
3a785f8
Polishing
haiqi96 Jun 21, 2024
f703001
Merge remote-tracking branch 'origin/main' into finalize_extraction_job
haiqi96 Jun 22, 2024
26ae8de
Merge branch 'main' of https://github.com/haiqi96/clp_fork into final…
haiqi96 Jun 24, 2024
d740476
Fixes
haiqi96 Jun 24, 2024
fbbf7f5
Fixes again
haiqi96 Jun 24, 2024
f00029c
linter
haiqi96 Jun 24, 2024
df88105
Add configurable option for target_uncompressed_size
haiqi96 Jun 25, 2024
bebde54
Apply suggestions from code review
haiqi96 Jun 26, 2024
b5d40ea
First batch of update
haiqi96 Jun 26, 2024
fb680c0
Another batch of change. DOCSTRING IS NOT READY YET
haiqi96 Jun 26, 2024
76e16b3
Refactor common functions
haiqi96 Jun 26, 2024
fe36599
Add doc string
haiqi96 Jun 26, 2024
0529b37
Linter
haiqi96 Jun 26, 2024
bd7083f
Update docstring.
kirkrodrigues Jun 28, 2024
4a4d5b2
Rename generic_run_query_task to run_query_task; Make search_task's m…
kirkrodrigues Jun 28, 2024
ba51583
remove str conversion for the envvars since they were already string …
haiqi96 Jun 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, clp_home: pathlib.Path, docker_clp_home: pathlib.Path):
self.data_dir: typing.Optional[DockerMount] = None
self.logs_dir: typing.Optional[DockerMount] = None
self.archives_output_dir: typing.Optional[DockerMount] = None
self.ir_output_dir: typing.Optional[DockerMount] = None


def get_clp_home():
Expand Down Expand Up @@ -224,6 +225,19 @@ def generate_container_config(clp_config: CLPConfig, clp_home: pathlib.Path):
container_clp_config.archive_output.directory,
)

container_clp_config.ir_output.directory = pathlib.Path("/") / "mnt" / "ir-output"
if not is_path_already_mounted(
clp_home,
CONTAINER_CLP_HOME,
clp_config.ir_output.directory,
container_clp_config.ir_output.directory,
):
docker_mounts.ir_output_dir = DockerMount(
DockerMountType.BIND,
clp_config.ir_output.directory,
container_clp_config.ir_output.directory,
)

return container_clp_config, docker_mounts


Expand Down Expand Up @@ -391,6 +405,7 @@ def validate_results_cache_config(
def validate_worker_config(clp_config: CLPConfig):
clp_config.validate_input_logs_dir()
clp_config.validate_archive_output_dir()
clp_config.validate_ir_output_dir()


def validate_webui_config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import subprocess
import sys
import time
import typing
import uuid
from typing import Any, Dict, List, Optional

import yaml
from clp_py_utils.clp_config import (
Expand Down Expand Up @@ -526,6 +526,8 @@ def start_compression_worker(
clp_config.redis.compression_backend_database,
num_cpus,
mounts,
None,
None,
)


Expand All @@ -538,6 +540,13 @@ def start_query_worker(
):
celery_method = "job_orchestration.executor.query"
celery_route = f"{QueueName.QUERY}"

query_worker_mount = [mounts.ir_output_dir]
query_worker_env = {
"CLP_IR_OUTPUT_DIR": container_clp_config.ir_output.directory,
"CLP_IR_COLLECTION": clp_config.results_cache.ir_collection_name,
}

generic_start_worker(
QUERY_WORKER_COMPONENT_NAME,
instance_id,
Expand All @@ -549,6 +558,8 @@ def start_query_worker(
clp_config.redis.query_backend_database,
num_cpus,
mounts,
query_worker_env,
query_worker_mount,
)


Expand All @@ -563,6 +574,8 @@ def generic_start_worker(
redis_database: int,
num_cpus: int,
mounts: CLPDockerMounts,
worker_specific_env: Dict[str, Any],
worker_specific_mount: List[Optional[DockerMount]],
):
logger.info(f"Starting {component_name}...")

Expand All @@ -578,6 +591,7 @@ def generic_start_worker(

# Create necessary directories
clp_config.archive_output.directory.mkdir(parents=True, exist_ok=True)
clp_config.ir_output.directory.mkdir(parents=True, exist_ok=True)

clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
# fmt: off
Expand Down Expand Up @@ -605,19 +619,28 @@ def generic_start_worker(
"-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}",
"-e", f"CLP_STORAGE_ENGINE={clp_config.package.storage_engine}",
"-u", f"{os.getuid()}:{os.getgid()}",
"--mount", str(mounts.clp_home),
]
if worker_specific_env:
for env_name, env_value in worker_specific_env.items():
container_start_cmd.append("-e")
container_start_cmd.append(f"{env_name}={env_value}")

# fmt: on
necessary_mounts = [
mounts.clp_home,
mounts.data_dir,
mounts.logs_dir,
mounts.archives_output_dir,
mounts.input_logs_dir,
]
if worker_specific_mount:
necessary_mounts.extend(worker_specific_mount)

for mount in necessary_mounts:
if mount:
container_start_cmd.append("--mount")
container_start_cmd.append(str(mount))
if not mount:
raise ValueError(f"Required mount configuration is empty: {necessary_mounts}")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
container_start_cmd.append("--mount")
container_start_cmd.append(str(mount))
container_start_cmd.append(clp_config.execution_container)

worker_cmd = [
Expand Down Expand Up @@ -645,8 +668,8 @@ def generic_start_worker(

def update_meteor_settings(
parent_key_prefix: str,
settings: typing.Dict[str, typing.Any],
updates: typing.Dict[str, typing.Any],
settings: Dict[str, Any],
updates: Dict[str, Any],
):
"""
Recursively updates the given Meteor settings object with the values from `updates`.
Expand Down
44 changes: 43 additions & 1 deletion components/clp-py-utils/clp_py_utils/clp_config.py
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ def validate_upsert_interval(cls, field):
class ResultsCache(BaseModel):
host: str = "localhost"
port: int = 27017
db_name: str = "clp-search"
db_name: str = "clp-query-results"
ir_collection_name: str = "ir-files"

@validator("host")
def validate_host(cls, field):
Expand All @@ -268,6 +269,12 @@ def validate_db_name(cls, field):
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME}.db_name cannot be empty.")
return field

@validator("ir_collection_name")
def validate_ir_collection_name(cls, field):
if "" == field:
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME}.ir_collection_name cannot be empty.")
return field

def get_uri(self):
return f"mongodb://{self.host}:{self.port}/{self.db_name}"

Expand Down Expand Up @@ -321,6 +328,32 @@ def dump_to_primitive_dict(self):
return d


class IrOutput(BaseModel):
directory: pathlib.Path = pathlib.Path("var") / "data" / "ir"
target_uncompressed_size: int = 128 * 1024 * 1024

@validator("directory")
def validate_directory(cls, field):
if "" == field:
raise ValueError("directory can not be empty")
return field

@validator("target_uncompressed_size")
def validate_target_uncompressed_size(cls, field):
if field <= 0:
raise ValueError("target_uncompressed_size must be greater than 0")
return field

def make_config_paths_absolute(self, clp_home: pathlib.Path):
self.directory = make_config_path_absolute(clp_home, self.directory)

def dump_to_primitive_dict(self):
d = self.dict()
# Turn directory (pathlib.Path) into a primitive string
d["directory"] = str(d["directory"])
return d


class WebUi(BaseModel):
host: str = "localhost"
port: int = 4000
Expand Down Expand Up @@ -368,6 +401,7 @@ class CLPConfig(BaseModel):
credentials_file_path: pathlib.Path = CLP_DEFAULT_CREDENTIALS_FILE_PATH

archive_output: ArchiveOutput = ArchiveOutput()
ir_output: IrOutput = IrOutput()
data_directory: pathlib.Path = pathlib.Path("var") / "data"
logs_directory: pathlib.Path = pathlib.Path("var") / "log"

Expand All @@ -377,6 +411,7 @@ def make_config_paths_absolute(self, clp_home: pathlib.Path):
self.input_logs_directory = make_config_path_absolute(clp_home, self.input_logs_directory)
self.credentials_file_path = make_config_path_absolute(clp_home, self.credentials_file_path)
self.archive_output.make_config_paths_absolute(clp_home)
self.ir_output.make_config_paths_absolute(clp_home)
self.data_directory = make_config_path_absolute(clp_home, self.data_directory)
self.logs_directory = make_config_path_absolute(clp_home, self.logs_directory)
self._os_release_file_path = make_config_path_absolute(clp_home, self._os_release_file_path)
Expand All @@ -396,6 +431,12 @@ def validate_archive_output_dir(self):
except ValueError as ex:
raise ValueError(f"archive_output.directory is invalid: {ex}")

def validate_ir_output_dir(self):
try:
validate_path_could_be_dir(self.ir_output.directory)
except ValueError as ex:
raise ValueError(f"ir_output.directory is invalid: {ex}")

def validate_data_dir(self):
try:
validate_path_could_be_dir(self.data_directory)
Expand Down Expand Up @@ -463,6 +504,7 @@ def load_redis_credentials_from_file(self):
def dump_to_primitive_dict(self):
d = self.dict()
d["archive_output"] = self.archive_output.dump_to_primitive_dict()
d["ir_output"] = self.ir_output.dump_to_primitive_dict()
# Turn paths into primitive strings
d["input_logs_directory"] = str(self.input_logs_directory)
d["credentials_file_path"] = str(self.credentials_file_path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

from job_orchestration.scheduler.constants import QueueName

imports = "job_orchestration.executor.query.fs_search_task"
imports = (
"job_orchestration.executor.query.fs_search_task",
"job_orchestration.executor.query.extract_ir_task",
)

task_routes = {
"job_orchestration.executor.query.fs_search_task.search": QueueName.QUERY,
"job_orchestration.executor.query.extract_ir_task.extract_ir": QueueName.QUERY,
}
task_create_missing_queues = True

Expand Down
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import datetime
import os
from pathlib import Path
from typing import Any, Dict, List, Optional

from celery.app.task import Task
from celery.utils.log import get_task_logger
from clp_py_utils.clp_config import Database, StorageEngine
from clp_py_utils.clp_logging import set_logging_level
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.executor.query.celery import app
from job_orchestration.executor.query.utils import (
report_command_creation_failure,
run_query_task,
)
from job_orchestration.scheduler.job_config import ExtractIrJobConfig
from job_orchestration.scheduler.scheduler_data import QueryTaskStatus

# Setup logging
logger = get_task_logger(__name__)


def make_command(
storage_engine: str,
clp_home: Path,
archives_dir: Path,
archive_id: str,
ir_output_dir: Path,
extract_ir_config: ExtractIrJobConfig,
results_cache_uri: str,
ir_collection: str,
) -> Optional[List[str]]:
if StorageEngine.CLP == storage_engine:
if not extract_ir_config.file_split_id:
logger.error("file_split_id not supplied")
return None
command = [
str(clp_home / "bin" / "clo"),
"i",
str(archives_dir / archive_id),
extract_ir_config.file_split_id,
str(ir_output_dir),
results_cache_uri,
ir_collection,
]
if extract_ir_config.target_uncompressed_size is not None:
command.append("--target-size")
command.append(extract_ir_config.target_uncompressed_size)
else:
logger.error(f"Unsupported storage engine {storage_engine}")
return None

return command


@app.task(bind=True)
def extract_ir(
self: Task,
job_id: str,
task_id: int,
job_config_obj: dict,
archive_id: str,
clp_metadata_db_conn_params: dict,
results_cache_uri: str,
) -> Dict[str, Any]:
task_name = "IR extraction"

# Setup logging to file
clp_logs_dir = Path(os.getenv("CLP_LOGS_DIR"))
clp_logging_level = os.getenv("CLP_LOGGING_LEVEL")
set_logging_level(logger, clp_logging_level)

logger.info(f"Started {task_name} task for job {job_id}")

start_time = datetime.datetime.now()
task_status: QueryTaskStatus
sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params))

# Make task_command
clp_home = Path(os.getenv("CLP_HOME"))
archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR"))
clp_storage_engine = os.getenv("CLP_STORAGE_ENGINE")
ir_output_dir = Path(os.getenv("CLP_IR_OUTPUT_DIR"))
ir_collection = os.getenv("CLP_IR_COLLECTION")
extract_ir_config = ExtractIrJobConfig.parse_obj(job_config_obj)

task_command = make_command(
storage_engine=clp_storage_engine,
clp_home=clp_home,
archives_dir=archive_directory,
archive_id=archive_id,
ir_output_dir=ir_output_dir,
extract_ir_config=extract_ir_config,
results_cache_uri=results_cache_uri,
ir_collection=ir_collection,
)
if not task_command:
return report_command_creation_failure(
sql_adapter=sql_adapter,
logger=logger,
task_name=task_name,
task_id=task_id,
start_time=start_time,
)

return run_query_task(
sql_adapter=sql_adapter,
logger=logger,
clp_logs_dir=clp_logs_dir,
task_command=task_command,
task_name=task_name,
job_id=job_id,
task_id=task_id,
start_time=start_time,
)
Loading