Skip to content

Commit

Permalink
feat(clp-package): Add support for extracting JSON streams from archi…
Browse files Browse the repository at this point in the history
…ves. (y-scope#569)

Co-authored-by: kirkrodrigues <[email protected]>
  • Loading branch information
haiqi96 and kirkrodrigues authored Nov 18, 2024
1 parent ac4f1c1 commit d969aaf
Show file tree
Hide file tree
Showing 20 changed files with 415 additions and 198 deletions.
17 changes: 9 additions & 8 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
# CONSTANTS
EXTRACT_FILE_CMD = "x"
EXTRACT_IR_CMD = "i"
EXTRACT_JSON_CMD = "j"

# Paths
CONTAINER_CLP_HOME = pathlib.Path("/") / "opt" / "clp"
Expand Down Expand Up @@ -84,7 +85,7 @@ def __init__(self, clp_home: pathlib.Path, docker_clp_home: pathlib.Path):
self.data_dir: typing.Optional[DockerMount] = None
self.logs_dir: typing.Optional[DockerMount] = None
self.archives_output_dir: typing.Optional[DockerMount] = None
self.ir_output_dir: typing.Optional[DockerMount] = None
self.stream_output_dir: typing.Optional[DockerMount] = None


def get_clp_home():
Expand Down Expand Up @@ -251,17 +252,17 @@ def generate_container_config(
container_clp_config.archive_output.directory,
)

container_clp_config.ir_output.directory = pathlib.Path("/") / "mnt" / "ir-output"
container_clp_config.stream_output.directory = pathlib.Path("/") / "mnt" / "stream-output"
if not is_path_already_mounted(
clp_home,
CONTAINER_CLP_HOME,
clp_config.ir_output.directory,
container_clp_config.ir_output.directory,
clp_config.stream_output.directory,
container_clp_config.stream_output.directory,
):
docker_mounts.ir_output_dir = DockerMount(
docker_mounts.stream_output_dir = DockerMount(
DockerMountType.BIND,
clp_config.ir_output.directory,
container_clp_config.ir_output.directory,
clp_config.stream_output.directory,
container_clp_config.stream_output.directory,
)

return container_clp_config, docker_mounts
Expand Down Expand Up @@ -482,7 +483,7 @@ def validate_results_cache_config(
def validate_worker_config(clp_config: CLPConfig):
clp_config.validate_input_logs_dir()
clp_config.validate_archive_output_dir()
clp_config.validate_ir_output_dir()
clp_config.validate_stream_output_dir()


def validate_webui_config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
dump_container_config,
EXTRACT_FILE_CMD,
EXTRACT_IR_CMD,
EXTRACT_JSON_CMD,
generate_container_config,
generate_container_name,
generate_container_start_cmd,
Expand Down Expand Up @@ -146,11 +147,11 @@ def handle_extract_file_cmd(
return 0


def handle_extract_ir_cmd(
def handle_extract_stream_cmd(
parsed_args, clp_home: pathlib.Path, default_config_file_path: pathlib.Path
) -> int:
"""
Handles the IR extraction command.
Handles the stream extraction command.
:param parsed_args:
:param clp_home:
:param default_config_file_path:
Expand All @@ -174,29 +175,41 @@ def handle_extract_ir_cmd(
)

# fmt: off
job_command = parsed_args.command
extract_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.decompress",
"--config", str(generated_config_path_on_container),
EXTRACT_IR_CMD,
str(parsed_args.msg_ix),
job_command
]
# fmt: on
if parsed_args.orig_file_id:
extract_cmd.append("--orig-file-id")
extract_cmd.append(str(parsed_args.orig_file_id))

if EXTRACT_IR_CMD == job_command:
extract_cmd.append(str(parsed_args.msg_ix))
if parsed_args.orig_file_id:
extract_cmd.append("--orig-file-id")
extract_cmd.append(str(parsed_args.orig_file_id))
else:
extract_cmd.append("--orig-file-path")
extract_cmd.append(str(parsed_args.orig_file_path))
if parsed_args.target_uncompressed_size:
extract_cmd.append("--target-uncompressed-size")
extract_cmd.append(str(parsed_args.target_uncompressed_size))
elif EXTRACT_JSON_CMD == job_command:
extract_cmd.append(str(parsed_args.archive_id))
if parsed_args.target_chunk_size:
extract_cmd.append("--target-chunk-size")
extract_cmd.append(str(parsed_args.target_chunk_size))
else:
extract_cmd.append("--orig-file-path")
extract_cmd.append(str(parsed_args.orig_file_path))
if parsed_args.target_uncompressed_size:
extract_cmd.append("--target-uncompressed-size")
extract_cmd.append(str(parsed_args.target_uncompressed_size))
logger.error(f"Unexpected command: {job_command}")
return -1

cmd = container_start_cmd + extract_cmd

try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError:
logger.exception("Docker or IR extraction command failed.")
logger.exception("Docker or stream extraction command failed.")
return -1

# Remove generated files
Expand Down Expand Up @@ -241,13 +254,20 @@ def main(argv):
group.add_argument("--orig-file-id", type=str, help="Original file's ID.")
group.add_argument("--orig-file-path", type=str, help="Original file's path.")

# JSON extraction command parser
json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD)
json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID")
json_extraction_parser.add_argument(
"--target-chunk-size", type=int, help="Target chunk size", default=100000
)

parsed_args = args_parser.parse_args(argv[1:])

command = parsed_args.command
if EXTRACT_FILE_CMD == command:
return handle_extract_file_cmd(parsed_args, clp_home, default_config_file_path)
elif EXTRACT_IR_CMD == command:
return handle_extract_ir_cmd(parsed_args, clp_home, default_config_file_path)
elif command in (EXTRACT_IR_CMD, EXTRACT_JSON_CMD):
return handle_extract_stream_cmd(parsed_args, clp_home, default_config_file_path)
else:
logger.exception(f"Unexpected command: {command}")
return -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@
from clp_py_utils.clp_config import CLP_METADATA_TABLE_PREFIX, CLPConfig, Database
from clp_py_utils.sql_adapter import SQL_Adapter
from job_orchestration.scheduler.constants import QueryJobStatus, QueryJobType
from job_orchestration.scheduler.job_config import ExtractIrJobConfig
from job_orchestration.scheduler.job_config import (
ExtractIrJobConfig,
ExtractJsonJobConfig,
QueryJobConfig,
)

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
EXTRACT_FILE_CMD,
EXTRACT_IR_CMD,
EXTRACT_JSON_CMD,
get_clp_home,
load_config_file,
)
Expand Down Expand Up @@ -70,45 +75,37 @@ def get_orig_file_id(db_config: Database, path: str) -> Optional[str]:
return results[0]["orig_file_id"]


def submit_and_monitor_ir_extraction_job_in_db(
def submit_and_monitor_extraction_job_in_db(
db_config: Database,
orig_file_id: str,
msg_ix: int,
target_uncompressed_size: Optional[int],
job_type: QueryJobType,
job_config: QueryJobConfig,
) -> int:
"""
Submits an IR extraction job to the scheduler and waits until the job finishes.
Submits a stream extraction job to the scheduler and waits until it finishes.
:param db_config:
:param orig_file_id:
:param msg_ix:
:param target_uncompressed_size:
:param job_type:
:param job_config:
:return: 0 on success, -1 otherwise.
"""
extract_ir_config = ExtractIrJobConfig(
orig_file_id=orig_file_id,
msg_ix=msg_ix,
target_uncompressed_size=target_uncompressed_size,
)

sql_adapter = SQL_Adapter(db_config)
job_id = submit_query_job(sql_adapter, extract_ir_config, QueryJobType.EXTRACT_IR)
job_id = submit_query_job(sql_adapter, job_config, job_type)
job_status = wait_for_query_job(sql_adapter, job_id)

if QueryJobStatus.SUCCEEDED == job_status:
logger.info(f"Finished IR extraction job {job_id}.")
logger.info(f"Finished extraction job {job_id}.")
return 0

logger.error(
f"IR extraction job {job_id} finished with unexpected status: {job_status.to_str()}."
)
logger.error(f"Extraction job {job_id} finished with unexpected status: {job_status.to_str()}.")
return -1


def handle_extract_ir_cmd(
parsed_args: argparse.Namespace, clp_home: pathlib.Path, default_config_file_path: pathlib.Path
def handle_extract_stream_cmd(
parsed_args: argparse.Namespace,
clp_home: pathlib.Path,
default_config_file_path: pathlib.Path,
) -> int:
"""
Handles the IR extraction command.
Handles the stream extraction command.
:param parsed_args:
:param clp_home:
:param default_config_file_path:
Expand All @@ -121,26 +118,46 @@ def handle_extract_ir_cmd(
if clp_config is None:
return -1

orig_file_id: str
if parsed_args.orig_file_id:
orig_file_id = parsed_args.orig_file_id
command = parsed_args.command

job_config: QueryJobConfig
job_type: QueryJobType
if EXTRACT_IR_CMD == command:
job_type = QueryJobType.EXTRACT_IR
orig_file_id: str
if parsed_args.orig_file_id:
orig_file_id = parsed_args.orig_file_id
else:
orig_file_path = parsed_args.orig_file_path
orig_file_id = get_orig_file_id(clp_config.database, orig_file_path)
if orig_file_id is None:
logger.error(f"Cannot find orig_file_id corresponding to '{orig_file_path}'.")
return -1
job_config = ExtractIrJobConfig(
orig_file_id=orig_file_id,
msg_ix=parsed_args.msg_ix,
target_uncompressed_size=parsed_args.target_uncompressed_size,
)
elif EXTRACT_JSON_CMD == command:
job_type = QueryJobType.EXTRACT_JSON
job_config = ExtractJsonJobConfig(
archive_id=parsed_args.archive_id, target_chunk_size=parsed_args.target_chunk_size
)
else:
orig_file_id = get_orig_file_id(clp_config.database, parsed_args.orig_file_path)
if orig_file_id is None:
return -1
logger.error(f"Unsupported stream extraction command: {command}")
return -1

try:
return asyncio.run(
run_function_in_process(
submit_and_monitor_ir_extraction_job_in_db,
submit_and_monitor_extraction_job_in_db,
clp_config.database,
orig_file_id,
parsed_args.msg_ix,
parsed_args.target_uncompressed_size,
job_type,
job_config,
)
)
except asyncio.CancelledError:
logger.error("IR extraction cancelled.")
logger.error("Stream extraction cancelled.")
return -1


Expand Down Expand Up @@ -278,13 +295,20 @@ def main(argv):
group.add_argument("--orig-file-id", type=str, help="Original file's ID.")
group.add_argument("--orig-file-path", type=str, help="Original file's path.")

# JSON extraction command parser
json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD)
json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID")
json_extraction_parser.add_argument(
"--target-chunk-size", type=int, help="Target chunk size.", required=True
)

parsed_args = args_parser.parse_args(argv[1:])

command = parsed_args.command
if EXTRACT_FILE_CMD == command:
return handle_extract_file_cmd(parsed_args, clp_home, default_config_file_path)
elif EXTRACT_IR_CMD == command:
return handle_extract_ir_cmd(parsed_args, clp_home, default_config_file_path)
elif command in (EXTRACT_IR_CMD, EXTRACT_JSON_CMD):
return handle_extract_stream_cmd(parsed_args, clp_home, default_config_file_path)
else:
logger.exception(f"Unexpected command: {command}")
return -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def create_results_cache_indices(
"python3",
str(clp_py_utils_dir / "create-results-cache-indices.py"),
"--uri", container_clp_config.results_cache.get_uri(),
"--ir-collection", container_clp_config.results_cache.ir_collection_name,
"--stream-collection", container_clp_config.results_cache.stream_collection_name,
]
# fmt: on

Expand Down Expand Up @@ -660,10 +660,10 @@ def start_query_worker(
celery_method = "job_orchestration.executor.query"
celery_route = f"{QueueName.QUERY}"

query_worker_mount = [mounts.ir_output_dir]
query_worker_mount = [mounts.stream_output_dir]
query_worker_env = {
"CLP_IR_OUTPUT_DIR": container_clp_config.ir_output.directory,
"CLP_IR_COLLECTION": clp_config.results_cache.ir_collection_name,
"CLP_STREAM_OUTPUT_DIR": container_clp_config.stream_output.directory,
"CLP_STREAM_COLLECTION_NAME": clp_config.results_cache.stream_collection_name,
}

generic_start_worker(
Expand Down Expand Up @@ -710,7 +710,7 @@ def generic_start_worker(

# Create necessary directories
clp_config.archive_output.directory.mkdir(parents=True, exist_ok=True)
clp_config.ir_output.directory.mkdir(parents=True, exist_ok=True)
clp_config.stream_output.directory.mkdir(parents=True, exist_ok=True)

clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
# fmt: off
Expand Down Expand Up @@ -933,9 +933,9 @@ def start_log_viewer_webui(
"MongoDbHost": clp_config.results_cache.host,
"MongoDbPort": clp_config.results_cache.port,
"MongoDbName": clp_config.results_cache.db_name,
"MongoDbIrFilesCollectionName": clp_config.results_cache.ir_collection_name,
"MongoDbStreamFilesCollectionName": clp_config.results_cache.stream_collection_name,
"ClientDir": str(container_log_viewer_webui_dir / "client"),
"IrFilesDir": str(container_clp_config.ir_output.directory),
"StreamFilesDir": str(container_clp_config.stream_output.directory),
"LogViewerDir": str(container_log_viewer_webui_dir / "yscope-log-viewer"),
}
settings_json = read_and_update_settings_json(settings_json_path, settings_json_updates)
Expand All @@ -961,7 +961,7 @@ def start_log_viewer_webui(
# fmt: on
necessary_mounts = [
mounts.clp_home,
mounts.ir_output_dir,
mounts.stream_output_dir,
]
for mount in necessary_mounts:
if mount:
Expand Down
Loading

0 comments on commit d969aaf

Please sign in to comment.