Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core-clp): Add LZMA Compressor implementation and LZMA dependency. #614

Merged
merged 68 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
dceb564
Add lzma download and port lzma scripts
Bill-hbrhbr Nov 25, 2024
d5af274
Make unit test pass
Bill-hbrhbr Nov 25, 2024
b94ca26
Refactor lzma compressor to group common functionalities into helplers
Bill-hbrhbr Nov 26, 2024
707c412
Improve comments
Bill-hbrhbr Nov 27, 2024
6d1ab8f
Fix reference link
Bill-hbrhbr Nov 27, 2024
89b5707
Add install for CentOS
Bill-hbrhbr Nov 27, 2024
c646cea
Apply coderabbit suggestions
Bill-hbrhbr Nov 27, 2024
c91e5fb
Remove decompressor related files
Bill-hbrhbr Nov 27, 2024
26b0663
Address review concerns
Bill-hbrhbr Nov 30, 2024
740bc1c
Address review concern
Bill-hbrhbr Dec 2, 2024
e2be883
Simplify else-if
Bill-hbrhbr Dec 2, 2024
905367d
Fix else-if
Bill-hbrhbr Dec 2, 2024
8ae88b2
Add lzma (xz) dep to MacOS
Bill-hbrhbr Dec 2, 2024
0d0c20e
Refactor helper run_lzma()
Bill-hbrhbr Dec 2, 2024
559485d
Update function doc
Bill-hbrhbr Dec 2, 2024
7c69c69
Clarify unit test early termination
Bill-hbrhbr Dec 2, 2024
a6d68b8
Update components/core/tests/test-StreamingCompression.cpp
Bill-hbrhbr Dec 2, 2024
1519c21
Split LZMA_RUN from flush actions
Bill-hbrhbr Dec 3, 2024
655bb46
Refactor unit test
Bill-hbrhbr Dec 3, 2024
4fb6c01
Update components/core/src/clp/streaming_compression/lzma/Compressor.cpp
Bill-hbrhbr Dec 3, 2024
a8799b5
Merge edits
Bill-hbrhbr Dec 3, 2024
2b85f01
Fix import
Bill-hbrhbr Dec 3, 2024
eda7d6c
Apply suggestions from code review
Bill-hbrhbr Dec 4, 2024
4164a9d
Address review concern
Bill-hbrhbr Dec 4, 2024
8ab0653
Add a comment
Bill-hbrhbr Dec 5, 2024
c436f21
Apply suggestions from code review
Bill-hbrhbr Dec 6, 2024
7bd34d2
Update comment to 100-char length
Bill-hbrhbr Dec 6, 2024
efd2b27
Fix according to coding style guidelines
Bill-hbrhbr Dec 11, 2024
c530f92
Apply suggestions from code review
Bill-hbrhbr Dec 12, 2024
e751ee6
Update CMakeLists.txt
Bill-hbrhbr Dec 12, 2024
1c5efcd
Address review concern
Bill-hbrhbr Dec 12, 2024
856c7cb
Update TODO
Bill-hbrhbr Dec 12, 2024
43e22d2
Case fix
Bill-hbrhbr Dec 12, 2024
829a6b2
Remove unnecessary function inline comments
Bill-hbrhbr Dec 12, 2024
81e1807
Improve comment
Bill-hbrhbr Dec 12, 2024
09b73c7
Refactor lzma stream related functions into a nested helper class
Bill-hbrhbr Dec 17, 2024
7cedb25
Adress coderabbit suggestions
Bill-hbrhbr Dec 19, 2024
3dbe388
feat(clp-package): Add support for deleting archives that are exclusi…
haiqi96 Nov 27, 2024
5d90054
feat(clp-s): Add the write path for single-file archives. (#563)
wraymo Nov 27, 2024
2b88c6f
test: Allow multiple trials when unittesting http headers (#613)
anlowee Nov 28, 2024
290ede3
chore(log-viewer-webui): Update `yscope-log-viewer` to the latest ver…
junhaoliao Nov 29, 2024
2c0e053
test(clp-s): Add end-to-end test case for compression and extraction.…
AVMatthews Dec 2, 2024
cbf8bf9
docs(clp-json): Update list of characters that requires escaping in q…
gibber9809 Dec 2, 2024
44b0f2b
feat(core): Add `ErrorCode` template to standardize conversion of use…
LinZhihao-723 Dec 2, 2024
4d21d9b
revert(core): Remove temporary output directory option from `clp` and…
haiqi96 Dec 5, 2024
36892c1
refactor(clp-package): Unify the metadata schema for JSON and IR stre…
haiqi96 Dec 6, 2024
604bd75
feat(clp-s): Add command line options for stubbed out kv-pair-IR inge…
AVMatthews Dec 9, 2024
0a9322b
feat(ffi): Add initial implementation of `IrErrorCode` (using the `Er…
LinZhihao-723 Dec 9, 2024
78a535c
feat(ffi): Add support for auto/user-generated KV-pairs in `KeyValueP…
LinZhihao-723 Dec 9, 2024
42db88c
build(docs): Update dependencies to latest versions. (#631)
kirkrodrigues Dec 10, 2024
13c7528
ci(pr-title-checks): Remove default GH workflow permissions and docum…
kirkrodrigues Dec 12, 2024
8b34dac
feat(core-clp): Add `BoundedReader` to prevent out-of-bound reads in …
gibber9809 Dec 13, 2024
02d8956
build(core): Update Boost to v1.87.0 in order to pull in boost::urls;…
gibber9809 Dec 16, 2024
da66fbf
refactor(clp-s): Replace instances of `std::string const&` with `std:…
gibber9809 Dec 16, 2024
b7741c0
docs(core): Indicate dependency install scripts should be run with el…
jackluo923 Dec 16, 2024
1edc16e
feat(package)!: Add support for writing clp-s single file archives to…
haiqi96 Dec 19, 2024
e4c9dd3
fix(clp-package): Remove faulty error handling for parsing archive co…
haiqi96 Dec 19, 2024
32dc989
fix(core): Add missing `../` to fix relative header file includes. (#…
Bill-hbrhbr Dec 19, 2024
02f0b8f
Refactor lzma stream and add some doc strings.
davidlion Dec 20, 2024
fcfc73a
Fix accidental comment reflow.
davidlion Dec 20, 2024
3b89d4c
Merge branch 'main' into pr-614
davidlion Dec 20, 2024
50c8e89
Merge branch 'main' into pr-614
davidlion Dec 20, 2024
d139bd5
Apply suggestions from code review
davidlion Dec 20, 2024
68c4c36
Add missing fixes for PR suggestion.
davidlion Dec 20, 2024
dcb843e
Address review concern
Bill-hbrhbr Dec 20, 2024
b20162f
Address review concern
Bill-hbrhbr Dec 20, 2024
df41b22
nit fix
Bill-hbrhbr Dec 20, 2024
524fe1d
Change all instances of programming-error-induced error codes to Erro…
Bill-hbrhbr Dec 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/clp-pr-title-checks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@ name: "clp-pr-title-checks"

on:
pull_request_target:
# NOTE: Workflows triggered by this event give the workflow access to secrets and grant the
# `GITHUB_TOKEN` read/write repository access by default. So we need to ensure:
# - This workflow doesn't inadvertently check out, build, or execute untrusted code from the
# pull request triggered by this event.
# - Each job has `permissions` set to only those necessary.
types: ["edited", "opened", "reopened"]
branches: ["main"]

permissions: {}

concurrency:
group: "${{github.workflow}}-${{github.ref}}"

Expand Down
13 changes: 13 additions & 0 deletions components/clp-package-utils/clp_package_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import logging

# Set up console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter(
"%(asctime)s.%(msecs)03d %(levelname)s [%(module)s] %(message)s", datefmt="%Y-%m-%dT%H:%M:%S"
)
logging_console_handler.setFormatter(logging_formatter)

# Set up root logger
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(logging_console_handler)
28 changes: 21 additions & 7 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
StorageType,
WEBUI_COMPONENT_NAME,
WorkerConfig,
)
from clp_py_utils.core import (
get_config_value,
Expand Down Expand Up @@ -107,7 +109,7 @@ def get_clp_home():
return clp_home.resolve()


def generate_container_name(job_type: JobType) -> str:
def generate_container_name(job_type: str) -> str:
Bill-hbrhbr marked this conversation as resolved.
Show resolved Hide resolved
"""
:param job_type:
:return: A unique container name for the given job type.
Expand Down Expand Up @@ -239,17 +241,17 @@ def generate_container_config(
DockerMountType.BIND, clp_config.logs_directory, container_clp_config.logs_directory
)

container_clp_config.archive_output.directory = pathlib.Path("/") / "mnt" / "archive-output"
container_clp_config.archive_output.set_directory(pathlib.Path("/") / "mnt" / "archive-output")
if not is_path_already_mounted(
clp_home,
CONTAINER_CLP_HOME,
clp_config.archive_output.directory,
container_clp_config.archive_output.directory,
clp_config.archive_output.get_directory(),
container_clp_config.archive_output.get_directory(),
):
docker_mounts.archives_output_dir = DockerMount(
DockerMountType.BIND,
clp_config.archive_output.directory,
container_clp_config.archive_output.directory,
clp_config.archive_output.get_directory(),
container_clp_config.archive_output.get_directory(),
)

container_clp_config.stream_output.directory = pathlib.Path("/") / "mnt" / "stream-output"
Expand All @@ -268,6 +270,18 @@ def generate_container_config(
return container_clp_config, docker_mounts


def generate_worker_config(clp_config: CLPConfig) -> WorkerConfig:
worker_config = WorkerConfig()
worker_config.package = clp_config.package.copy(deep=True)
worker_config.archive_output = clp_config.archive_output.copy(deep=True)
worker_config.data_directory = clp_config.data_directory

worker_config.stream_output_dir = clp_config.stream_output.directory
worker_config.stream_collection_name = clp_config.results_cache.stream_collection_name

return worker_config


def dump_container_config(
container_clp_config: CLPConfig, clp_config: CLPConfig, container_name: str
) -> Tuple[pathlib.Path, pathlib.Path]:
Expand Down Expand Up @@ -482,7 +496,7 @@ def validate_results_cache_config(

def validate_worker_config(clp_config: CLPConfig):
clp_config.validate_input_logs_dir()
clp_config.validate_archive_output_dir()
clp_config.validate_archive_output_config()
clp_config.validate_stream_output_dir()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,7 @@
validate_and_load_db_credentials_file,
)

# Setup logging
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def main(argv):
Expand Down Expand Up @@ -66,7 +58,7 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

container_name = generate_container_name(JobType.COMPRESSION)
container_name = generate_container_name(str(JobType.COMPRESSION))

container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
from typing import Optional

from clp_py_utils.clp_config import CLPConfig
from clp_py_utils.clp_config import CLPConfig, StorageType

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
Expand All @@ -25,15 +25,7 @@
validate_path_could_be_dir,
)

# Setup logging
# Create logger
logger = logging.getLogger("clp")
logger.setLevel(logging.DEBUG)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)
logger = logging.getLogger(__file__)


def validate_and_load_config(
Expand Down Expand Up @@ -89,7 +81,12 @@ def handle_extract_file_cmd(
if clp_config is None:
return -1

container_name = generate_container_name(JobType.FILE_EXTRACTION)
storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"File extraction is not supported for archive storage type: {storage_type}.")
return -1

container_name = generate_container_name(str(JobType.FILE_EXTRACTION))
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
container_clp_config, clp_config, container_name
Expand Down Expand Up @@ -164,7 +161,14 @@ def handle_extract_stream_cmd(
if clp_config is None:
return -1

container_name = generate_container_name(JobType.IR_EXTRACTION)
storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(
f"Stream extraction is not supported for archive storage type: {storage_type}."
)
return -1

container_name = generate_container_name(str(JobType.IR_EXTRACTION))
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
container_clp_config, clp_config, container_name
Expand Down
110 changes: 110 additions & 0 deletions components/clp-package-utils/clp_package_utils/scripts/del_archives.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import argparse
import logging
import subprocess
import sys
from pathlib import Path

from clp_py_utils.clp_config import StorageType

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
dump_container_config,
generate_container_config,
generate_container_name,
generate_container_start_cmd,
get_clp_home,
load_config_file,
validate_and_load_db_credentials_file,
)

logger = logging.getLogger(__file__)


def main(argv):
clp_home = get_clp_home()
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH

args_parser = argparse.ArgumentParser(
description="Deletes archives that fall within the specified time range."
)
args_parser.add_argument(
"--config",
"-c",
default=str(default_config_file_path),
help="CLP package configuration file.",
)
args_parser.add_argument(
"--begin-ts",
type=int,
default=0,
help="Time-range lower-bound (inclusive) as milliseconds from the UNIX epoch.",
)
args_parser.add_argument(
"--end-ts",
type=int,
required=True,
help="Time-range upper-bound (include) as milliseconds from the UNIX epoch.",
)
parsed_args = args_parser.parse_args(argv[1:])

# Validate and load config file
try:
config_file_path = Path(parsed_args.config)
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
clp_config.validate_logs_dir()

# Validate and load necessary credentials
validate_and_load_db_credentials_file(clp_config, clp_home, False)
except:
logger.exception("Failed to load config.")
return -1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add specific error handling for different failure scenarios

The bare except clause catches all exceptions without distinguishing between different error types. This could mask specific issues that require different handling.

-    except:
+    except (FileNotFoundError, PermissionError) as e:
+        logger.exception(f"Failed to access config file: {e}")
+        return -1
+    except ValueError as e:
+        logger.exception(f"Invalid config file format: {e}")
+        return -1
+    except Exception as e:
+        logger.exception(f"Unexpected error loading config: {e}")
+        return -1
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except:
logger.exception("Failed to load config.")
return -1
except (FileNotFoundError, PermissionError) as e:
logger.exception(f"Failed to access config file: {e}")
return -1
except ValueError as e:
logger.exception(f"Invalid config file format: {e}")
return -1
except Exception as e:
logger.exception(f"Unexpected error loading config: {e}")
return -1


storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"Archive deletion is not supported for storage type: {storage_type}.")
return -1

# Validate the input timestamp
begin_ts = parsed_args.begin_ts
end_ts = parsed_args.end_ts
if begin_ts > end_ts:
logger.error("begin-ts must be <= end-ts")
return -1
if end_ts < 0 or begin_ts < 0:
logger.error("begin_ts and end_ts must be non-negative.")
return -1

container_name = generate_container_name("del-archives")

container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
container_clp_config, clp_config, container_name
)

necessary_mounts = [mounts.clp_home, mounts.logs_dir, mounts.archives_output_dir]
container_start_cmd = generate_container_start_cmd(
container_name, necessary_mounts, clp_config.execution_container
)

# fmt: off
del_archive_cmd = [
"python3",
"-m", "clp_package_utils.scripts.native.del_archives",
"--config", str(generated_config_path_on_container),
str(begin_ts),
str(end_ts)

]
# fmt: on

cmd = container_start_cmd + del_archive_cmd
subprocess.run(cmd, check=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Add error handling for subprocess execution

The subprocess execution lacks proper error handling and logging for potential failures.

-    subprocess.run(cmd, check=True)
+    try:
+        result = subprocess.run(cmd, check=True, capture_output=True, text=True)
+    except subprocess.CalledProcessError as e:
+        logger.error(f"Archive deletion failed: {e.stderr}")
+        return -1
+    except Exception as e:
+        logger.error(f"Failed to execute archive deletion: {e}")
+        return -1

Committable suggestion skipped: line range outside the PR's diff.


# Remove generated files
generated_config_path_on_host.unlink()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for file cleanup

The cleanup operation should handle potential failures when removing generated files.

-    generated_config_path_on_host.unlink()
+    try:
+        if generated_config_path_on_host.exists():
+            generated_config_path_on_host.unlink()
+    except Exception as e:
+        logger.warning(f"Failed to clean up generated config file: {e}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Remove generated files
generated_config_path_on_host.unlink()
# Remove generated files
try:
if generated_config_path_on_host.exists():
generated_config_path_on_host.unlink()
except Exception as e:
logger.warning(f"Failed to clean up generated config file: {e}")


return 0


if "__main__" == __name__:
sys.exit(main(sys.argv))
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,7 @@
load_config_file,
)

# Setup logging
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def print_compression_job_status(job_row, current_time):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,7 @@
wait_for_query_job,
)

# Setup logging
# Create logger
logger = logging.getLogger(__file__)
logger.setLevel(logging.INFO)
# Setup console logging
logging_console_handler = logging.StreamHandler()
logging_formatter = logging.Formatter("%(asctime)s [%(levelname)s] [%(name)s] %(message)s")
logging_console_handler.setFormatter(logging_formatter)
logger.addHandler(logging_console_handler)


def get_orig_file_id(db_config: Database, path: str) -> Optional[str]:
Expand Down Expand Up @@ -175,7 +167,7 @@ def validate_and_load_config_file(
"""
try:
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
clp_config.validate_archive_output_dir()
clp_config.validate_archive_output_config()
clp_config.validate_logs_dir()
return clp_config
except Exception:
Expand Down Expand Up @@ -215,7 +207,7 @@ def handle_extract_file_cmd(
list_path = parsed_args.files_from

logs_dir = clp_config.logs_directory
archives_dir = clp_config.archive_output.directory
archives_dir = clp_config.archive_output.get_directory()

# Generate database config file for clp
db_config_file_path = logs_dir / f".decompress-db-config-{uuid.uuid4()}.yml"
Expand Down
Loading
Loading