Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(package)!: Add support for writing clp-s single file archives to S3. #634

Merged
merged 43 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
ca46dca
First version backup
haiqi96 Dec 11, 2024
b763e8b
Small refactor
haiqi96 Dec 11, 2024
4e9529c
First trial for new config
haiqi96 Dec 11, 2024
e9cdea4
Further refactor and polishing
haiqi96 Dec 11, 2024
9ba0a38
Another small refactor
haiqi96 Dec 12, 2024
58befef
small refactor again
haiqi96 Dec 12, 2024
35ec0c3
Combine s3 utils
haiqi96 Dec 12, 2024
5d57b10
Support handling S3 error message
haiqi96 Dec 12, 2024
9991307
Slight logging modification
haiqi96 Dec 12, 2024
5d23790
Linter
haiqi96 Dec 12, 2024
b4bb2af
Add extra verification
haiqi96 Dec 12, 2024
f41c558
Update components/clp-py-utils/clp_py_utils/clp_config.py
haiqi96 Dec 12, 2024
ce5a667
do nothing for now
haiqi96 Dec 12, 2024
f05dc88
backup changes for worker config
haiqi96 Dec 12, 2024
abf5dde
More support
haiqi96 Dec 13, 2024
7d34456
Remove unnecssary change
haiqi96 Dec 13, 2024
a7afd0d
Linter
haiqi96 Dec 13, 2024
99d3094
Handle mount for fs & S3
haiqi96 Dec 13, 2024
1afed1a
Linter
haiqi96 Dec 13, 2024
1de661a
Remove unused functions
haiqi96 Dec 13, 2024
ce3de98
Update components/job-orchestration/job_orchestration/executor/compre…
haiqi96 Dec 13, 2024
f49664f
simplify worker config
haiqi96 Dec 13, 2024
046cdcb
polishing
haiqi96 Dec 13, 2024
242dec2
linter
haiqi96 Dec 14, 2024
ed280cb
Apply suggestions from code review
haiqi96 Dec 16, 2024
0788e59
Fix easier ones
haiqi96 Dec 16, 2024
c198f27
Backup changes
haiqi96 Dec 16, 2024
4819f76
Small fixes
haiqi96 Dec 16, 2024
e5f43fb
fixes
haiqi96 Dec 16, 2024
1246062
add safeguard for archive update failure
haiqi96 Dec 17, 2024
3b870a4
Add docstrings
haiqi96 Dec 17, 2024
214ae3f
Apply suggestions from code review
haiqi96 Dec 18, 2024
6ff92fc
Clean up
haiqi96 Dec 18, 2024
9e07d37
update pyproject.toml
haiqi96 Dec 18, 2024
915b49d
Add docstrings
haiqi96 Dec 18, 2024
a061a29
Apply suggestions from code review
haiqi96 Dec 18, 2024
8301748
Update name as suggested by the code review
haiqi96 Dec 18, 2024
2ada464
a few small fixes to ensure other scripts still work
haiqi96 Dec 18, 2024
6e5aad5
adding safeguard for empty stdout line from clp.
haiqi96 Dec 18, 2024
55c0f36
add safe guard for search
haiqi96 Dec 18, 2024
2d7443e
Polish error messages.
haiqi96 Dec 18, 2024
6f907b2
Linter
haiqi96 Dec 18, 2024
120ffec
Slighlty improve the error message
haiqi96 Dec 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
StorageType,
WEBUI_COMPONENT_NAME,
WorkerConfig,
)
from clp_py_utils.core import (
get_config_value,
Expand Down Expand Up @@ -239,17 +241,17 @@ def generate_container_config(
DockerMountType.BIND, clp_config.logs_directory, container_clp_config.logs_directory
)

container_clp_config.archive_output.directory = pathlib.Path("/") / "mnt" / "archive-output"
container_clp_config.archive_output.set_directory(pathlib.Path("/") / "mnt" / "archive-output")
if not is_path_already_mounted(
clp_home,
CONTAINER_CLP_HOME,
clp_config.archive_output.directory,
container_clp_config.archive_output.directory,
clp_config.archive_output.get_directory(),
container_clp_config.archive_output.get_directory(),
):
docker_mounts.archives_output_dir = DockerMount(
DockerMountType.BIND,
clp_config.archive_output.directory,
container_clp_config.archive_output.directory,
clp_config.archive_output.get_directory(),
container_clp_config.archive_output.get_directory(),
)

container_clp_config.stream_output.directory = pathlib.Path("/") / "mnt" / "stream-output"
Expand All @@ -268,6 +270,18 @@ def generate_container_config(
return container_clp_config, docker_mounts


def generate_worker_config(clp_config: CLPConfig) -> WorkerConfig:
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
worker_config = WorkerConfig()
worker_config.package = clp_config.package.copy(deep=True)
worker_config.archive_output = clp_config.archive_output.copy(deep=True)
worker_config.data_directory = clp_config.data_directory

worker_config.stream_output_dir = clp_config.stream_output.directory
worker_config.stream_collection_name = clp_config.results_cache.stream_collection_name

return worker_config


def dump_container_config(
container_clp_config: CLPConfig, clp_config: CLPConfig, container_name: str
) -> Tuple[pathlib.Path, pathlib.Path]:
Expand Down Expand Up @@ -482,7 +496,7 @@ def validate_results_cache_config(

def validate_worker_config(clp_config: CLPConfig):
clp_config.validate_input_logs_dir()
clp_config.validate_archive_output_dir()
clp_config.validate_archive_output_config()
clp_config.validate_stream_output_dir()


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
from typing import Optional

from clp_py_utils.clp_config import CLPConfig
from clp_py_utils.clp_config import CLPConfig, StorageType

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
Expand Down Expand Up @@ -81,6 +81,11 @@ def handle_extract_file_cmd(
if clp_config is None:
return -1

storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"File extraction is not supported for storage type: {storage_type}.")
return -1

container_name = generate_container_name(str(JobType.FILE_EXTRACTION))
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
Expand Down Expand Up @@ -156,6 +161,11 @@ def handle_extract_stream_cmd(
if clp_config is None:
return -1

storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"Stream extraction is not supported for storage type: {storage_type}.")
return -1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add this to the search script as well? Technically we're going to implement it soon, but better to have a working package for now until that happens.


container_name = generate_container_name(str(JobType.IR_EXTRACTION))
container_clp_config, mounts = generate_container_config(clp_config, clp_home)
generated_config_path_on_container, generated_config_path_on_host = dump_container_config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import sys
from pathlib import Path

from clp_py_utils.clp_config import StorageType

from clp_package_utils.general import (
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
dump_container_config,
Expand Down Expand Up @@ -57,6 +59,11 @@ def main(argv):
logger.exception("Failed to load config.")
return -1

storage_type = clp_config.archive_output.storage.type
if StorageType.FS != storage_type:
logger.error(f"Archive deletion is not supported for storage type: {storage_type}.")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not changing to "archive storage type" since we have "Archive" at the beginning of the sentence.

return -1

# Validate the input timestamp
begin_ts = parsed_args.begin_ts
end_ts = parsed_args.end_ts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def validate_and_load_config_file(
"""
try:
clp_config = load_config_file(config_file_path, default_config_file_path, clp_home)
clp_config.validate_archive_output_dir()
clp_config.validate_archive_output_config()
clp_config.validate_logs_dir()
return clp_config
except Exception:
Expand Down Expand Up @@ -207,7 +207,7 @@ def handle_extract_file_cmd(
list_path = parsed_args.files_from

logs_dir = clp_config.logs_directory
archives_dir = clp_config.archive_output.directory
archives_dir = clp_config.archive_output.get_directory()

# Generate database config file for clp
db_config_file_path = logs_dir / f".decompress-db-config-{uuid.uuid4()}.yml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def main(argv):
return -1

database_config = clp_config.database
archives_dir = clp_config.archive_output.directory
archives_dir = clp_config.archive_output.get_directory()
if not archives_dir.exists():
logger.error("`archive_output.directory` doesn't exist.")
return -1
Expand Down
45 changes: 23 additions & 22 deletions components/clp-package-utils/clp_package_utils/scripts/start_clp.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
StorageType,
WEBUI_COMPONENT_NAME,
)
from job_orchestration.scheduler.constants import QueueName
Expand All @@ -42,6 +43,7 @@
DockerMount,
DockerMountType,
generate_container_config,
generate_worker_config,
get_clp_home,
is_container_exited,
is_container_running,
Expand Down Expand Up @@ -626,6 +628,7 @@ def start_compression_worker(
):
celery_method = "job_orchestration.executor.compress"
celery_route = f"{QueueName.COMPRESSION}"
compression_worker_mounts = [mounts.archives_output_dir]
generic_start_worker(
COMPRESSION_WORKER_COMPONENT_NAME,
instance_id,
Expand All @@ -637,8 +640,7 @@ def start_compression_worker(
clp_config.redis.compression_backend_database,
num_cpus,
mounts,
None,
None,
compression_worker_mounts,
)


Expand All @@ -652,11 +654,9 @@ def start_query_worker(
celery_method = "job_orchestration.executor.query"
celery_route = f"{QueueName.QUERY}"

query_worker_mount = [mounts.stream_output_dir]
query_worker_env = {
"CLP_STREAM_OUTPUT_DIR": container_clp_config.stream_output.directory,
"CLP_STREAM_COLLECTION_NAME": clp_config.results_cache.stream_collection_name,
}
query_worker_mounts = [mounts.stream_output_dir]
if clp_config.archive_output.storage.type == StorageType.FS:
query_worker_mounts.append(mounts.archives_output_dir)

generic_start_worker(
QUERY_WORKER_COMPONENT_NAME,
Expand All @@ -669,8 +669,7 @@ def start_query_worker(
clp_config.redis.query_backend_database,
num_cpus,
mounts,
query_worker_env,
query_worker_mount,
query_worker_mounts,
)


Expand All @@ -685,23 +684,26 @@ def generic_start_worker(
redis_database: int,
num_cpus: int,
mounts: CLPDockerMounts,
worker_specific_env: Dict[str, Any],
worker_specific_mount: List[Optional[DockerMount]],
worker_specific_mount: Optional[List[Optional[DockerMount]]],
kirkrodrigues marked this conversation as resolved.
Show resolved Hide resolved
):
logger.info(f"Starting {component_name}...")

container_name = f"clp-{component_name}-{instance_id}"
if container_exists(container_name):
return

validate_worker_config(clp_config)
container_config_filename = f"{container_name}.yml"
container_config_file_path = clp_config.logs_directory / container_config_filename
container_worker_config = generate_worker_config(container_clp_config)
with open(container_config_file_path, "w") as f:
yaml.safe_dump(container_worker_config.dump_to_primitive_dict(), f)

logs_dir = clp_config.logs_directory / component_name
logs_dir.mkdir(parents=True, exist_ok=True)
container_logs_dir = container_clp_config.logs_directory / component_name

# Create necessary directories
clp_config.archive_output.directory.mkdir(parents=True, exist_ok=True)
clp_config.archive_output.get_directory().mkdir(parents=True, exist_ok=True)
clp_config.stream_output.directory.mkdir(parents=True, exist_ok=True)

clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
Expand All @@ -724,24 +726,17 @@ def generic_start_worker(
f"{container_clp_config.redis.host}:{container_clp_config.redis.port}/{redis_database}"
),
"-e", f"CLP_HOME={CONTAINER_CLP_HOME}",
"-e", f"CLP_DATA_DIR={container_clp_config.data_directory}",
"-e", f"CLP_ARCHIVE_OUTPUT_DIR={container_clp_config.archive_output.directory}",
"-e", f"CLP_CONFIG_PATH={container_clp_config.logs_directory / container_config_filename}",
"-e", f"CLP_LOGS_DIR={container_logs_dir}",
"-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}",
"-e", f"CLP_STORAGE_ENGINE={clp_config.package.storage_engine}",
"-u", f"{os.getuid()}:{os.getgid()}",
]
if worker_specific_env:
for env_name, env_value in worker_specific_env.items():
container_start_cmd.append("-e")
container_start_cmd.append(f"{env_name}={env_value}")

# fmt: on
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

necessary_mounts = [
mounts.clp_home,
mounts.data_dir,
mounts.logs_dir,
mounts.archives_output_dir,
mounts.input_logs_dir,
]
if worker_specific_mount:
Expand Down Expand Up @@ -1125,6 +1120,12 @@ def main(argv):
QUERY_WORKER_COMPONENT_NAME,
):
validate_and_load_redis_credentials_file(clp_config, clp_home, True)
if target in (
ALL_TARGET_NAME,
COMPRESSION_WORKER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
):
validate_worker_config(clp_config)

clp_config.validate_data_dir()
clp_config.validate_logs_dir()
Expand Down
Loading
Loading