Skip to content

Commit

Permalink
More support
Browse files Browse the repository at this point in the history
  • Loading branch information
haiqi96 committed Dec 13, 2024
1 parent f05dc88 commit abf5dde
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 62 deletions.
1 change: 1 addition & 0 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ def generate_worker_config(clp_config: CLPConfig) -> WorkerConfig:
worker_config.archive_output = clp_config.archive_output.copy(deep=True)
worker_config.stream_output = clp_config.stream_output.copy(deep=True)
worker_config.package = clp_config.package.copy(deep=True)
worker_config.results_cache = clp_config.results_cache.copy(deep=True)
worker_config.data_directory = clp_config.data_directory

return worker_config
Expand Down
7 changes: 4 additions & 3 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,11 +638,12 @@ def dump_to_primitive_dict(self):
class WorkerConfig(BaseModel):
package: Package = Package()
archive_output: ArchiveOutput = ArchiveOutput()
# Only needed by query worker. Consider inheriting at some point.
stream_output: StreamOutput = StreamOutput()

data_directory: pathlib.Path = pathlib.Path("var") / "data"

# Only needed by query worker. Consider inheriting at some point.
stream_output = StreamOutput()
results_cache: ResultsCache = ResultsCache()

def dump_to_primitive_dict(self):
d = self.dict()
d["archive_output"]["storage"] = self.archive_output.storage.dump_to_primitive_dict()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,19 +135,22 @@ def make_clp_s_command(
archive_output_dir: pathlib.Path,
clp_config: ClpIoConfig,
db_config_file_path: pathlib.Path,
enable_s3_write: bool,
):
# fmt: off
compression_cmd = [
str(clp_home / "bin" / "clp-s"),
"c", str(archive_output_dir),
"--single-file-archive",
"--print-archive-stats",
"--target-encoded-size",
str(clp_config.output.target_segment_size + clp_config.output.target_dictionaries_size),
"--db-config-file", str(db_config_file_path),
]
# fmt: on

if enable_s3_write:
compression_cmd.append("--single-file-archive")

if clp_config.input.timestamp_key is not None:
compression_cmd.append("--timestamp-key")
compression_cmd.append(clp_config.input.timestamp_key)
Expand All @@ -159,8 +162,6 @@ def run_clp(
worker_config: WorkerConfig,
clp_config: ClpIoConfig,
clp_home: pathlib.Path,
data_dir: pathlib.Path,
archive_output_dir: pathlib.Path,
logs_dir: pathlib.Path,
job_id: int,
task_id: int,
Expand All @@ -175,8 +176,6 @@ def run_clp(
:param worker_config: WorkerConfig
:param clp_config: ClpIoConfig
:param clp_home:
:param data_dir:
:param archive_output_dir:
:param logs_dir:
:param job_id:
:param task_id:
Expand All @@ -186,20 +185,31 @@ def run_clp(
:param clp_metadata_db_connection_config
:return: tuple -- (whether compression was successful, output messages)
"""
clp_storage_engine = worker_config.package.storage_engine

instance_id_str = f"compression-job-{job_id}-task-{task_id}"

clp_storage_engine = worker_config.package.storage_engine
data_dir = worker_config.data_directory
archive_output_dir = worker_config.archive_output.get_directory()

# Generate database config file for clp
db_config_file_path = data_dir / f"{instance_id_str}-db-config.yml"
db_config_file = open(db_config_file_path, "w")
yaml.safe_dump(clp_metadata_db_connection_config, db_config_file)
db_config_file.close()

# Get s3 config
storage_config = worker_config.archive_output.storage
s3_config = storage_config.s3_config if StorageType.S3 == storage_config.type else None
s3_upload_failed = False
s3_config = None
enable_s3_write = False
s3_write_failed = False
storage_type = worker_config.archive_output.storage.type
if StorageType.S3 == storage_type:
# This should be caught by start-clp and could be redundant, but let's be safe for now.
if StorageEngine.CLP == clp_storage_engine:
logger.error(f"S3 is not supported for {clp_storage_engine}")
return False, {"error_message": f"S3 is not supported for {clp_storage_engine}"}

s3_config = worker_config.archive_output.storage.s3_config
enable_s3_write = True

if StorageEngine.CLP == clp_storage_engine:
compression_cmd = make_clp_command(
Expand All @@ -214,6 +224,7 @@ def run_clp(
archive_output_dir=archive_output_dir,
clp_config=clp_config,
db_config_file_path=db_config_file_path,
enable_s3_write=enable_s3_write
)
else:
logger.error(f"Unsupported storage engine {clp_storage_engine}")
Expand Down Expand Up @@ -264,13 +275,13 @@ def run_clp(
if last_archive_stats is not None and (
None is stats or stats["id"] != last_archive_stats["id"]
):
if s3_config is not None:
if enable_s3_write:
result = upload_single_file_archive_to_s3(
last_archive_stats, archive_output_dir, s3_config
)
if not result.success:
worker_output["error_message"] = result.error
s3_upload_failed = True
s3_write_failed = True
# Upon failure, skip updating the archive tags and job metadata.
break

Expand Down Expand Up @@ -308,7 +319,7 @@ def run_clp(
# Close stderr log file
stderr_log_file.close()

if s3_upload_failed:
if s3_write_failed:
logger.error(f"Failed to upload to S3.")
return CompressionTaskStatus.FAILED, worker_output
if compression_successful:
Expand All @@ -329,23 +340,24 @@ def compress(
clp_metadata_db_connection_config,
):
clp_home = pathlib.Path(os.getenv("CLP_HOME"))

# Set logging level
logs_dir = pathlib.Path(os.getenv("CLP_LOGS_DIR"))
clp_logging_level = str(os.getenv("CLP_LOGGING_LEVEL"))
set_logging_level(logger, clp_logging_level)

# Load configuration
worker_config_path = pathlib.Path(os.getenv("WORKER_CONFIG_PATH"))
try:
worker_config = WorkerConfig.parse_obj(read_yaml_config_file(worker_config_path))
except ValidationError as err:
logger.error(err)
return -1
worker_config = WorkerConfig.parse_obj(read_yaml_config_file(pathlib.Path(os.getenv("WORKER_CONFIG_PATH"))))
except Exception as ex:
logger.error(ex)
return -1


# Set logging level
clp_logging_level = str(os.getenv("CLP_LOGGING_LEVEL"))
set_logging_level(logger, clp_logging_level)
error_msg = "Failed to load worker config"
logger.exception(error_msg)
return CompressionTaskResult(
task_id=task_id,
status=CompressionTaskStatus.FAILED,
duration=0,
error_message=error_msg
)

clp_io_config = ClpIoConfig.parse_raw(clp_io_config_json)
paths_to_compress = PathsToCompress.parse_raw(paths_to_compress_json)
Expand All @@ -358,8 +370,6 @@ def compress(
worker_config,
clp_io_config,
clp_home,
worker_config.data_directory,
worker_config.archive_output.get_directory(),
logs_dir,
job_id,
task_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@

from celery.app.task import Task
from celery.utils.log import get_task_logger
from clp_py_utils.clp_config import Database, StorageEngine
from clp_py_utils.clp_config import Database, StorageEngine, StorageType, WorkerConfig
from clp_py_utils.clp_logging import set_logging_level
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.executor.utils import try_load_worker_config
from job_orchestration.executor.query.celery import app
from job_orchestration.executor.query.utils import (
report_command_creation_failure,
report_task_failure,
run_query_task,
)
from job_orchestration.scheduler.job_config import ExtractIrJobConfig, ExtractJsonJobConfig
Expand All @@ -21,15 +22,17 @@


def make_command(
storage_engine: str,
clp_home: Path,
archives_dir: Path,
worker_config: WorkerConfig,
archive_id: str,
stream_output_dir: Path,
job_config: dict,
results_cache_uri: str,
stream_collection_name: str,
) -> Optional[List[str]]:
storage_engine = worker_config.package.storage_engine
archives_dir = worker_config.archive_output.get_directory()
stream_output_dir = worker_config.stream_output.directory
stream_collection_name = worker_config.results_cache.stream_collection_name

if StorageEngine.CLP == storage_engine:
logger.info("Starting IR extraction")
extract_ir_config = ExtractIrJobConfig.parse_obj(job_config)
Expand Down Expand Up @@ -97,28 +100,37 @@ def extract_stream(
task_status: QueryTaskStatus
sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params))

# Load configuration
worker_config = try_load_worker_config(os.getenv("WORKER_CONFIG_PATH"), logger)
if worker_config is None:
return report_task_failure(
sql_adapter=sql_adapter,
task_id=task_id,
start_time=start_time,
)

if worker_config.archive_output.storage.type == StorageType.S3:
logger.error(f"Extraction is not supported for S3 storage type")
return report_task_failure(
sql_adapter=sql_adapter,
task_id=task_id,
start_time=start_time,
)

# Make task_command
clp_home = Path(os.getenv("CLP_HOME"))
archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR"))
clp_storage_engine = os.getenv("CLP_STORAGE_ENGINE")
stream_output_dir = Path(os.getenv("CLP_STREAM_OUTPUT_DIR"))
stream_collection_name = os.getenv("CLP_STREAM_COLLECTION_NAME")

task_command = make_command(
storage_engine=clp_storage_engine,
clp_home=clp_home,
archives_dir=archive_directory,
worker_config=worker_config,
archive_id=archive_id,
stream_output_dir=stream_output_dir,
job_config=job_config,
results_cache_uri=results_cache_uri,
stream_collection_name=stream_collection_name,
)
if not task_command:
return report_command_creation_failure(
logger.error(f"Error creating {task_name} command")
return report_task_failure(
sql_adapter=sql_adapter,
logger=logger,
task_name=task_name,
task_id=task_id,
start_time=start_time,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@

from celery.app.task import Task
from celery.utils.log import get_task_logger
from clp_py_utils.clp_config import Database, StorageEngine
from clp_py_utils.clp_config import Database, StorageEngine, StorageType, WorkerConfig
from clp_py_utils.clp_logging import set_logging_level
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.executor.query.celery import app
from job_orchestration.executor.query.utils import (
report_command_creation_failure,
report_task_failure,
run_query_task,
)
from job_orchestration.executor.utils import try_load_worker_config
from job_orchestration.scheduler.job_config import SearchJobConfig
from job_orchestration.scheduler.scheduler_data import QueryTaskStatus

Expand All @@ -21,14 +22,16 @@


def make_command(
storage_engine: str,
clp_home: Path,
archives_dir: Path,
worker_config: WorkerConfig,
archive_id: str,
search_config: SearchJobConfig,
results_cache_uri: str,
results_collection: str,
) -> Optional[List[str]]:
storage_engine = worker_config.package.storage_engine
archives_dir = worker_config.archive_output.get_directory()

if StorageEngine.CLP == storage_engine:
command = [str(clp_home / "bin" / "clo"), "s", str(archives_dir / archive_id)]
if search_config.path_filter is not None:
Expand Down Expand Up @@ -116,26 +119,39 @@ def search(
task_status: QueryTaskStatus
sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params))

# Load configuration
worker_config = try_load_worker_config(os.getenv("WORKER_CONFIG_PATH"), logger)
if worker_config is None:
return report_task_failure(
sql_adapter=sql_adapter,
task_id=task_id,
start_time=start_time,
)

if worker_config.archive_output.storage.type == StorageType.S3:
logger.error(f"Search is not supported for S3 storage type")
return report_task_failure(
sql_adapter=sql_adapter,
task_id=task_id,
start_time=start_time,
)

# Make task_command
clp_home = Path(os.getenv("CLP_HOME"))
archive_directory = Path(os.getenv("CLP_ARCHIVE_OUTPUT_DIR"))
clp_storage_engine = os.getenv("CLP_STORAGE_ENGINE")
search_config = SearchJobConfig.parse_obj(job_config)

task_command = make_command(
storage_engine=clp_storage_engine,
clp_home=clp_home,
archives_dir=archive_directory,
worker_config=worker_config,
archive_id=archive_id,
search_config=search_config,
results_cache_uri=results_cache_uri,
results_collection=job_id,
)
if not task_command:
return report_command_creation_failure(
logger.error(f"Error creating {task_name} command")
return report_task_failure(
sql_adapter=sql_adapter,
logger=logger,
task_name=task_name,
task_id=task_id,
start_time=start_time,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ def get_task_log_file_path(clp_logs_dir: Path, job_id: str, task_id: int) -> Pat
return worker_logs_dir / f"{task_id}-clo.log"


def report_command_creation_failure(
def report_task_failure(
sql_adapter: SQL_Adapter,
logger: Logger,
task_name: str,
task_id: int,
start_time: datetime.datetime,
):
logger.error(f"Error creating {task_name} command")
task_status = QueryTaskStatus.FAILED
update_query_task_metadata(
sql_adapter,
Expand Down
18 changes: 18 additions & 0 deletions components/job-orchestration/job_orchestration/executor/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from typing import Optional
from logging import Logger
from clp_py_utils.clp_config import WorkerConfig
from clp_py_utils.core import read_yaml_config_file
from pathlib import Path
def try_load_worker_config(
config_path_str: str,
logger: Logger,
) -> Optional[WorkerConfig]:
if config_path_str is None:
logger.error("config_path_str can't be empty")
return None

try:
return WorkerConfig.parse_obj(read_yaml_config_file(Path(config_path_str)))
except Exception:
logger.exception("Failed to load worker config")
return None

0 comments on commit abf5dde

Please sign in to comment.