Skip to content

Commit

Permalink
clp-package: Add support for extracting files as IR streams through t…
Browse files Browse the repository at this point in the history
…he CLI. (y-scope#472)
  • Loading branch information
haiqi96 authored Jul 19, 2024
1 parent 44aaff9 commit 1d71b6f
Show file tree
Hide file tree
Showing 4 changed files with 338 additions and 66 deletions.
10 changes: 7 additions & 3 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import typing
import uuid
from enum import auto
from typing import List, Tuple
from typing import List, Optional, Tuple

import yaml
from clp_py_utils.clp_config import (
Expand All @@ -31,6 +31,9 @@
from strenum import KebabCaseStrEnum

# CONSTANTS
EXTRACT_FILE_CMD = "x"
EXTRACT_IR_CMD = "i"

# Paths
CONTAINER_CLP_HOME = pathlib.Path("/") / "opt" / "clp"
CONTAINER_INPUT_LOGS_ROOT_DIR = pathlib.Path("/") / "mnt" / "logs"
Expand All @@ -45,7 +48,8 @@ class DockerMountType(enum.IntEnum):

class JobType(KebabCaseStrEnum):
COMPRESSION = auto()
DECOMPRESSION = auto()
FILE_EXTRACTION = auto()
IR_EXTRACTION = auto()
SEARCH = auto()


Expand Down Expand Up @@ -283,7 +287,7 @@ def dump_container_config(


def generate_container_start_cmd(
container_name: str, container_mounts: List[CLPDockerMounts], container_image: str
container_name: str, container_mounts: List[Optional[DockerMount]], container_image: str
) -> List[str]:
"""
Generates the command to start a container with the given mounts and name.
Expand Down
203 changes: 163 additions & 40 deletions components/clp-package-utils/clp_package_utils/scripts/decompress.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pathlib
import subprocess
import sys
from typing import Optional

from clp_py_utils.clp_config import CLPConfig

Expand All @@ -11,6 +12,8 @@
DockerMount,
DockerMountType,
dump_container_config,
EXTRACT_FILE_CMD,
EXTRACT_IR_CMD,
generate_container_config,
generate_container_name,
generate_container_start_cmd,
Expand All @@ -32,40 +35,43 @@
logger.addHandler(logging_console_handler)


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

args_parser = argparse.ArgumentParser(description="Decompresses logs")
args_parser.add_argument(
"--config",
"-c",
type=str,
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument("paths", metavar="PATH", nargs="*", help="Files to decompress.")
args_parser.add_argument("-f", "--files-from", help="A file listing all files to decompress.")
args_parser.add_argument(
"-d", "--extraction-dir", metavar="DIR", default=".", help="Decompress files into DIR"
)
parsed_args = args_parser.parse_args(argv[1:])

# Validate and load config file
def validate_and_load_config(
clp_home: pathlib.Path,
config_file_path: pathlib.Path,
default_config_file_path: pathlib.Path,
) -> Optional[CLPConfig]:
"""
Validates and loads the config file.
:param clp_home:
:param config_file_path:
:param default_config_file_path:
:return: The config object on success, None otherwise.
"""
try:
config_file_path = pathlib.Path(parsed_args.config)
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
clp_config.validate_logs_dir()

# Validate and load necessary credentials
validate_and_load_db_credentials_file(clp_config, clp_home, False)
return clp_config
except:
logger.exception("Failed to load config.")
return -1
return None


paths_to_decompress_file_path = None
def handle_extract_file_cmd(
parsed_args, clp_home: pathlib.Path, default_config_file_path: pathlib.Path
) -> int:
"""
Handles the file extraction command.
:param parsed_args:
:param clp_home:
:param default_config_file_path:
:return: 0 on success, -1 otherwise.
"""
paths_to_extract_file_path = None
if parsed_args.files_from:
paths_to_decompress_file_path = pathlib.Path(parsed_args.files_from)
paths_to_extract_file_path = pathlib.Path(parsed_args.files_from)

# Validate extraction directory
extraction_dir = pathlib.Path(parsed_args.extraction_dir).resolve()
Expand All @@ -74,15 +80,22 @@ def main(argv):
except ValueError as ex:
logger.error(f"extraction-dir is invalid: {ex}")
return -1
extraction_dir.mkdir(exist_ok=True)

container_name = generate_container_name(JobType.DECOMPRESSION)
# Validate and load config file
clp_config = validate_and_load_config(
clp_home, pathlib.Path(parsed_args.config), default_config_file_path
)
if clp_config is None:
return -1

container_name = generate_container_name(JobType.FILE_EXTRACTION)
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
container_clp_config, clp_config, container_name
)

# Set up mounts
extraction_dir.mkdir(exist_ok=True)
container_extraction_dir = pathlib.Path("/") / "mnt" / "extraction-dir"
necessary_mounts = [
mounts.clp_home,
Expand All @@ -91,44 +104,154 @@ def main(argv):
mounts.archives_output_dir,
DockerMount(DockerMountType.BIND, extraction_dir, container_extraction_dir),
]
container_paths_to_decompress_file_path = None
if paths_to_decompress_file_path:
container_paths_to_decompress_file_path = (
pathlib.Path("/") / "mnt" / "paths-to-decompress.txt"
)
container_paths_to_extract_file_path = None
if paths_to_extract_file_path:
container_paths_to_extract_file_path = pathlib.Path("/") / "mnt" / "paths-to-extract.txt"
necessary_mounts.append(
DockerMount(
DockerMountType.BIND,
paths_to_decompress_file_path,
container_paths_to_decompress_file_path,
paths_to_extract_file_path,
container_paths_to_extract_file_path,
)
)
container_start_cmd = generate_container_start_cmd(
container_name, necessary_mounts, clp_config.execution_container
)

# fmt: off
decompress_cmd = [
extract_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.decompress",
"--config", str(generated_config_path_on_container),
EXTRACT_FILE_CMD,
"-d", str(container_extraction_dir),
]
# fmt: on
for path in parsed_args.paths:
decompress_cmd.append(path)
if container_paths_to_decompress_file_path:
decompress_cmd.append("--input-list")
decompress_cmd.append(container_paths_to_decompress_file_path)
extract_cmd.append(path)
if container_paths_to_extract_file_path:
extract_cmd.append("--input-list")
extract_cmd.append(container_paths_to_extract_file_path)

cmd = container_start_cmd + decompress_cmd
subprocess.run(cmd, check=True)
cmd = container_start_cmd + extract_cmd
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError:
logger.exception("Docker or file extraction command failed.")
return -1

# Remove generated files
generated_config_path_on_host.unlink()

return 0


def handle_extract_ir_cmd(
parsed_args, clp_home: pathlib.Path, default_config_file_path: pathlib.Path
) -> int:
"""
Handles the IR extraction command.
:param parsed_args:
:param clp_home:
:param default_config_file_path:
:return: 0 on success, -1 otherwise.
"""
# Validate and load config file
clp_config = validate_and_load_config(
clp_home, pathlib.Path(parsed_args.config), default_config_file_path
)
if clp_config is None:
return -1

container_name = generate_container_name(JobType.IR_EXTRACTION)
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
container_clp_config, clp_config, container_name
)
necessary_mounts = [mounts.clp_home, mounts.logs_dir]
container_start_cmd = generate_container_start_cmd(
container_name, necessary_mounts, clp_config.execution_container
)

# fmt: off
extract_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.decompress",
"--config", str(generated_config_path_on_container),
EXTRACT_IR_CMD,
str(parsed_args.msg_ix),
]
# fmt: on
if parsed_args.orig_file_id:
extract_cmd.append("--orig-file-id")
extract_cmd.append(str(parsed_args.orig_file_id))
else:
extract_cmd.append("--orig-file-path")
extract_cmd.append(str(parsed_args.orig_file_path))
if parsed_args.target_uncompressed_size:
extract_cmd.append("--target-uncompressed-size")
extract_cmd.append(str(parsed_args.target_uncompressed_size))
cmd = container_start_cmd + extract_cmd

try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError:
logger.exception("Docker or IR extraction command failed.")
return -1

# Remove generated files
generated_config_path_on_host.unlink()

return 0


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

args_parser = argparse.ArgumentParser(description="Decompresses logs")
args_parser.add_argument(
"--config",
"-c",
default=str(default_config_file_path),
help="CLP configuration file.",
)
command_args_parser = args_parser.add_subparsers(dest="command", required=True)

# File extraction command parser
file_extraction_parser = command_args_parser.add_parser(EXTRACT_FILE_CMD)
file_extraction_parser.add_argument(
"paths", metavar="PATH", nargs="*", help="Files to extract."
)
file_extraction_parser.add_argument(
"-f", "--files-from", help="A file listing all files to extract."
)
file_extraction_parser.add_argument(
"-d", "--extraction-dir", metavar="DIR", default=".", help="Extract files into DIR."
)

# IR extraction command parser
ir_extraction_parser = command_args_parser.add_parser(EXTRACT_IR_CMD)
ir_extraction_parser.add_argument("msg_ix", type=int, help="Message index.")
ir_extraction_parser.add_argument(
"--target-uncompressed-size", type=int, help="Target uncompressed IR size."
)

group = ir_extraction_parser.add_mutually_exclusive_group(required=True)
group.add_argument("--orig-file-id", type=str, help="Original file's ID.")
group.add_argument("--orig-file-path", type=str, help="Original file's path.")

parsed_args = args_parser.parse_args(argv[1:])

command = parsed_args.command
if EXTRACT_FILE_CMD == command:
return handle_extract_file_cmd(parsed_args, clp_home, default_config_file_path)
elif EXTRACT_IR_CMD == command:
return handle_extract_ir_cmd(parsed_args, clp_home, default_config_file_path)
else:
logger.exception(f"Unexpected command: {command}")
return -1


if "__main__" == __name__:
sys.exit(main(sys.argv))
Loading

0 comments on commit 1d71b6f

Please sign in to comment.