Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(package)!: Add support for writing clp-s single file archives to S3. #634

Merged
merged 43 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
ca46dca
First version backup
haiqi96 Dec 11, 2024
b763e8b
Small refactor
haiqi96 Dec 11, 2024
4e9529c
First trial for new config
haiqi96 Dec 11, 2024
e9cdea4
Further refactor and polishing
haiqi96 Dec 11, 2024
9ba0a38
Another small refactor
haiqi96 Dec 12, 2024
58befef
small refactor again
haiqi96 Dec 12, 2024
35ec0c3
Combine s3 utils
haiqi96 Dec 12, 2024
5d57b10
Support handling S3 error message
haiqi96 Dec 12, 2024
9991307
Slight logging modification
haiqi96 Dec 12, 2024
5d23790
Linter
haiqi96 Dec 12, 2024
b4bb2af
Add extra verification
haiqi96 Dec 12, 2024
f41c558
Update components/clp-py-utils/clp_py_utils/clp_config.py
haiqi96 Dec 12, 2024
ce5a667
do nothing for now
haiqi96 Dec 12, 2024
f05dc88
backup changes for worker config
haiqi96 Dec 12, 2024
abf5dde
More support
haiqi96 Dec 13, 2024
7d34456
Remove unnecssary change
haiqi96 Dec 13, 2024
a7afd0d
Linter
haiqi96 Dec 13, 2024
99d3094
Handle mount for fs & S3
haiqi96 Dec 13, 2024
1afed1a
Linter
haiqi96 Dec 13, 2024
1de661a
Remove unused functions
haiqi96 Dec 13, 2024
ce3de98
Update components/job-orchestration/job_orchestration/executor/compre…
haiqi96 Dec 13, 2024
f49664f
simplify worker config
haiqi96 Dec 13, 2024
046cdcb
polishing
haiqi96 Dec 13, 2024
242dec2
linter
haiqi96 Dec 14, 2024
ed280cb
Apply suggestions from code review
haiqi96 Dec 16, 2024
0788e59
Fix easier ones
haiqi96 Dec 16, 2024
c198f27
Backup changes
haiqi96 Dec 16, 2024
4819f76
Small fixes
haiqi96 Dec 16, 2024
e5f43fb
fixes
haiqi96 Dec 16, 2024
1246062
add safeguard for archive update failure
haiqi96 Dec 17, 2024
3b870a4
Add docstrings
haiqi96 Dec 17, 2024
214ae3f
Apply suggestions from code review
haiqi96 Dec 18, 2024
6ff92fc
Clean up
haiqi96 Dec 18, 2024
9e07d37
update pyproject.toml
haiqi96 Dec 18, 2024
915b49d
Add docstrings
haiqi96 Dec 18, 2024
a061a29
Apply suggestions from code review
haiqi96 Dec 18, 2024
8301748
Update name as suggested by the code review
haiqi96 Dec 18, 2024
2ada464
a few small fixes to ensure other scripts still work
haiqi96 Dec 18, 2024
6e5aad5
adding safeguard for empty stdout line from clp.
haiqi96 Dec 18, 2024
55c0f36
add safe guard for search
haiqi96 Dec 18, 2024
2d7443e
Polish error messages.
haiqi96 Dec 18, 2024
6f907b2
Linter
haiqi96 Dec 18, 2024
120ffec
Slighlty improve the error message
haiqi96 Dec 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions components/clp-package-utils/clp_package_utils/general.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
REDIS_COMPONENT_NAME,
REDUCER_COMPONENT_NAME,
RESULTS_CACHE_COMPONENT_NAME,
StorageType,
WEBUI_COMPONENT_NAME,
)
from clp_py_utils.core import (
Expand All @@ -28,6 +29,7 @@
read_yaml_config_file,
validate_path_could_be_dir,
)
from clp_py_utils.s3_utils import verify_s3_config_for_archive_output
from strenum import KebabCaseStrEnum

# CONSTANTS
Expand Down Expand Up @@ -239,17 +241,17 @@ def generate_container_config(
DockerMountType.BIND, clp_config.logs_directory, container_clp_config.logs_directory
)

container_clp_config.archive_output.directory = pathlib.Path("/") / "mnt" / "archive-output"
container_clp_config.archive_output.set_directory(pathlib.Path("/") / "mnt" / "archive-output")
if not is_path_already_mounted(
clp_home,
CONTAINER_CLP_HOME,
clp_config.archive_output.directory,
container_clp_config.archive_output.directory,
clp_config.archive_output.get_directory(),
container_clp_config.archive_output.get_directory(),
):
docker_mounts.archives_output_dir = DockerMount(
DockerMountType.BIND,
clp_config.archive_output.directory,
container_clp_config.archive_output.directory,
clp_config.archive_output.get_directory(),
container_clp_config.archive_output.get_directory(),
)

container_clp_config.stream_output.directory = pathlib.Path("/") / "mnt" / "stream-output"
Expand Down Expand Up @@ -482,9 +484,15 @@ def validate_results_cache_config(

def validate_worker_config(clp_config: CLPConfig):
clp_config.validate_input_logs_dir()
clp_config.validate_archive_output_dir()
clp_config.validate_archive_output_config()
clp_config.validate_stream_output_dir()

storage_config = clp_config.archive_output.storage
if StorageType.S3 == storage_config.type:
result = verify_s3_config_for_archive_output(storage_config.s3_config)
if not result.success:
raise ValueError(f"S3 config verification failed: {result.error}")


def validate_webui_config(
clp_config: CLPConfig, logs_dir: pathlib.Path, settings_json_path: pathlib.Path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -694,14 +694,12 @@ def generic_start_worker(
if container_exists(container_name):
return

validate_worker_config(clp_config)

logs_dir = clp_config.logs_directory / component_name
logs_dir.mkdir(parents=True, exist_ok=True)
container_logs_dir = container_clp_config.logs_directory / component_name

# Create necessary directories
clp_config.archive_output.directory.mkdir(parents=True, exist_ok=True)
clp_config.archive_output.get_directory().mkdir(parents=True, exist_ok=True)
clp_config.stream_output.directory.mkdir(parents=True, exist_ok=True)

clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
Expand All @@ -713,6 +711,7 @@ def generic_start_worker(
"-w", str(CONTAINER_CLP_HOME),
"--name", container_name,
"--log-driver", "local",
"-u", f"{os.getuid()}:{os.getgid()}",
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
"-e", f"PYTHONPATH={clp_site_packages_dir}",
"-e", (
f"BROKER_URL=amqp://"
Expand All @@ -725,22 +724,38 @@ def generic_start_worker(
),
"-e", f"CLP_HOME={CONTAINER_CLP_HOME}",
"-e", f"CLP_DATA_DIR={container_clp_config.data_directory}",
"-e", f"CLP_ARCHIVE_OUTPUT_DIR={container_clp_config.archive_output.directory}",
"-e", f"CLP_LOGS_DIR={container_logs_dir}",
"-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}",
"-e", f"CLP_STORAGE_ENGINE={clp_config.package.storage_engine}",
"-u", f"{os.getuid()}:{os.getgid()}",
# need a way to remove this maybe
"-e", f"CLP_ARCHIVE_OUTPUT_DIR={container_clp_config.archive_output.get_directory()}",
]
if worker_specific_env:
for env_name, env_value in worker_specific_env.items():
container_start_cmd.append("-e")
container_start_cmd.append(f"{env_name}={env_value}")

if "s3" == clp_config.archive_output.storage.type:
s3_config = clp_config.archive_output.storage.s3_config
container_start_cmd += [
"-e", f"ENABLE_S3_ARCHIVE=1",
"-e", f"REGION_NAME={s3_config.region_name}",
"-e", f"BUCKET={s3_config.bucket}",
"-e", f"KEY_PREFIX={s3_config.key_prefix}"
]
if s3_config.secret_access_key is not None and s3_config.secret_access_key is not None:
container_start_cmd += [
"-e", f"ACCESS_KEY_ID={s3_config.access_key_id}",
"-e", f"SECRET_ACCESS_KEY={s3_config.secret_access_key}"
]

haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
# fmt: on
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
necessary_mounts = [
mounts.clp_home,
mounts.data_dir,
mounts.logs_dir,
# need a way to remove this maybe, since reader doesn't need it if it is staged.
# one option is to move it to the worker_specific_mount
mounts.archives_output_dir,
mounts.input_logs_dir,
]
Expand Down Expand Up @@ -1125,6 +1140,12 @@ def main(argv):
QUERY_WORKER_COMPONENT_NAME,
):
validate_and_load_redis_credentials_file(clp_config, clp_home, True)
if target in (
ALL_TARGET_NAME,
COMPRESSION_WORKER_COMPONENT_NAME,
QUERY_WORKER_COMPONENT_NAME,
):
validate_worker_config(clp_config)

clp_config.validate_data_dir()
clp_config.validate_logs_dir()
Expand Down
143 changes: 121 additions & 22 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import pathlib
import typing
from enum import auto
from typing import Literal, Optional, Union

from dotenv import dotenv_values
from pydantic import BaseModel, PrivateAttr, validator
from strenum import KebabCaseStrEnum
from strenum import KebabCaseStrEnum, LowercaseStrEnum

from .clp_logging import get_valid_logging_level, is_valid_logging_level
from .core import (
Expand Down Expand Up @@ -48,6 +48,11 @@ class StorageEngine(KebabCaseStrEnum):
CLP_S = auto()


class StorageType(LowercaseStrEnum):
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
FS = auto()
S3 = auto()


VALID_STORAGE_ENGINES = [storage_engine.value for storage_engine in StorageEngine]


Expand All @@ -69,12 +74,12 @@ class Database(BaseModel):
host: str = "localhost"
port: int = 3306
name: str = "clp-db"
ssl_cert: typing.Optional[str] = None
ssl_cert: Optional[str] = None
auto_commit: bool = False
compress: bool = True

username: typing.Optional[str] = None
password: typing.Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None

@validator("type")
def validate_database_type(cls, field):
Expand Down Expand Up @@ -227,7 +232,7 @@ class Redis(BaseModel):
query_backend_database: int = 0
compression_backend_database: int = 1
# redis can perform authentication without a username
password: typing.Optional[str]
password: Optional[str]

@validator("host")
def validate_host(cls, field):
Expand Down Expand Up @@ -300,17 +305,100 @@ class Queue(BaseModel):
host: str = "localhost"
port: int = 5672

username: typing.Optional[str]
password: typing.Optional[str]
username: Optional[str]
password: Optional[str]


class ArchiveOutput(BaseModel):
class S3Config(BaseModel):
# Todo:
# Does key_prefix need to end with '/'? maybe it doesn't.

# Required fields
region_name: str
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
bucket: str
key_prefix: str

# Optional fields
access_key_id: Optional[str] = None
secret_access_key: Optional[str] = None

@validator("region_name")
def validate_region_name(cls, field):
if field == "":
raise ValueError("region_name is not provided")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
return field

@validator("bucket")
def validate_bucket(cls, field):
if field == "":
raise ValueError("bucket is not provided")
return field

@validator("key_prefix")
def validate_key_prefix(cls, field):
if field == "":
raise ValueError("key_prefix is not provided")
return field


class FSStorage(BaseModel):
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
type: Literal[StorageType.FS.value] = StorageType.FS.value
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
directory: pathlib.Path = pathlib.Path("var") / "data" / "archives"

@validator("directory")
def validate_directory(cls, field):
if "" == field:
raise ValueError("directory can not be empty")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
return field

def make_config_path_absolute(self, clp_home: pathlib.Path):
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
self.directory = make_config_path_absolute(clp_home, self.directory)

def dump_to_primitive_dict(self):
d = self.dict()
d["directory"] = str(d["directory"])
return d


class S3Storage(BaseModel):
type: Literal[StorageType.S3.value] = StorageType.S3.value
staging_directory: pathlib.Path = pathlib.Path("var") / "data" / "staged_archives"
s3_config: S3Config

@validator("staging_directory")
def validate_staging_directory(cls, field):
if "" == field:
raise ValueError("staging_directory can not be empty")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
return field

@validator("s3_config")
def validate_s3_config(cls, field):
if None == field:
raise ValueError("s3_config must be provided")
return field
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

def make_config_path_absolute(self, clp_home: pathlib.Path):
self.staging_directory = make_config_path_absolute(clp_home, self.staging_directory)

def dump_to_primitive_dict(self):
d = self.dict()
d["staging_directory"] = str(d["staging_directory"])
return d


class ArchiveOutput(BaseModel):
storage: Union[FSStorage, S3Storage] = FSStorage()
target_archive_size: int = 256 * 1024 * 1024 # 256 MB
target_dictionaries_size: int = 32 * 1024 * 1024 # 32 MB
target_encoded_file_size: int = 256 * 1024 * 1024 # 256 MB
target_segment_size: int = 256 * 1024 * 1024 # 256 MB

@validator("storage")
def validate_storage(cls, field) -> bool:
if None == field:
raise ValueError("storage must be provided")
return field
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

@validator("target_archive_size")
def validate_target_archive_size(cls, field):
if field <= 0:
Expand All @@ -335,14 +423,18 @@ def validate_target_segment_size(cls, field):
raise ValueError("target_segment_size must be greater than 0")
return field

def make_config_paths_absolute(self, clp_home: pathlib.Path):
self.directory = make_config_path_absolute(clp_home, self.directory)
def set_directory(self, directory: pathlib.Path) -> None:
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
storage_config = self.storage
if StorageType.FS == storage_config.type:
storage_config.directory = directory
else:
storage_config.staging_directory = directory
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

def dump_to_primitive_dict(self):
d = self.dict()
# Turn directory (pathlib.Path) into a primitive string
d["directory"] = str(d["directory"])
return d
def get_directory(self) -> pathlib.Path:
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
storage_config = self.storage
if StorageType.FS == storage_config.type:
return storage_config.directory
return storage_config.staging_directory


class StreamOutput(BaseModel):
Expand Down Expand Up @@ -408,7 +500,7 @@ def validate_port(cls, field):


class CLPConfig(BaseModel):
execution_container: typing.Optional[str]
execution_container: Optional[str]
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

input_logs_directory: pathlib.Path = pathlib.Path("/")

Expand Down Expand Up @@ -436,7 +528,7 @@ class CLPConfig(BaseModel):
def make_config_paths_absolute(self, clp_home: pathlib.Path):
self.input_logs_directory = make_config_path_absolute(clp_home, self.input_logs_directory)
self.credentials_file_path = make_config_path_absolute(clp_home, self.credentials_file_path)
self.archive_output.make_config_paths_absolute(clp_home)
self.archive_output.storage.make_config_path_absolute(clp_home)
self.stream_output.make_config_paths_absolute(clp_home)
self.data_directory = make_config_path_absolute(clp_home, self.data_directory)
self.logs_directory = make_config_path_absolute(clp_home, self.logs_directory)
Expand All @@ -451,11 +543,18 @@ def validate_input_logs_dir(self):
if not input_logs_dir.is_dir():
raise ValueError(f"input_logs_directory '{input_logs_dir}' is not a directory.")

def validate_archive_output_dir(self):
def validate_archive_output_config(self):
if (
StorageType.S3 == self.archive_output.storage.type
and StorageEngine.CLP_S != self.package.storage_engine
):
raise ValueError(
f"S3 storage is only supported with storage engine: {StorageEngine.CLP_S}"
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
)
try:
validate_path_could_be_dir(self.archive_output.directory)
validate_path_could_be_dir(self.archive_output.get_directory())
except ValueError as ex:
raise ValueError(f"archive_output.directory is invalid: {ex}")
raise ValueError(f"directory of storage config is invalid: {ex}")
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

def validate_stream_output_dir(self):
try:
Expand Down Expand Up @@ -529,7 +628,7 @@ def load_redis_credentials_from_file(self):

def dump_to_primitive_dict(self):
d = self.dict()
d["archive_output"] = self.archive_output.dump_to_primitive_dict()
d["archive_output"]["storage"] = self.archive_output.storage.dump_to_primitive_dict()
d["stream_output"] = self.stream_output.dump_to_primitive_dict()
# Turn paths into primitive strings
d["input_logs_directory"] = str(self.input_logs_directory)
Expand Down
10 changes: 10 additions & 0 deletions components/clp-py-utils/clp_py_utils/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Optional


class Result:
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, success: bool, error: Optional[str] = None):
self.success = success
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
self.error = error
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

def __repr__(self):
return f"Result(success={self.success}, error={self.error})"
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
13 changes: 13 additions & 0 deletions components/clp-py-utils/clp_py_utils/s3_utils.py
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import boto3
from botocore.exceptions import ClientError

from clp_py_utils.clp_config import S3Config
from clp_py_utils.result import Result


def verify_s3_config_for_archive_output(s3_config: S3Config) -> Result:
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
# TODO: need to verify:
# 1. Have write priveldge so archive can be compressed
# 2. Have read priviledge so archive can be readed
# 3. bucket and region are the same, this should run into issue but not sure
return Result(success=True)
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved
Loading
Loading