Skip to content

Commit

Permalink
Fix easier ones
Browse files Browse the repository at this point in the history
  • Loading branch information
haiqi96 committed Dec 16, 2024
1 parent ed280cb commit b2ccc22
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ def handle_extract_file_cmd(
list_path = parsed_args.files_from

logs_dir = clp_config.logs_directory
archives_dir = clp_config.archive_output.directory
archives_dir = clp_config.archive_output.get_directory()

# Generate database config file for clp
db_config_file_path = logs_dir / f".decompress-db-config-{uuid.uuid4()}.yml"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def main(argv):
return -1

database_config = clp_config.database
archives_dir = clp_config.archive_output.directory
archives_dir = clp_config.archive_output.get_directory()
if not archives_dir.exists():
logger.error("`archive_output.directory` doesn't exist.")
return -1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ def start_compression_worker(
):
celery_method = "job_orchestration.executor.compress"
celery_route = f"{QueueName.COMPRESSION}"
compression_worker_mount = [mounts.archives_output_dir]
compression_worker_mounts = [mounts.archives_output_dir]
generic_start_worker(
COMPRESSION_WORKER_COMPONENT_NAME,
instance_id,
Expand All @@ -640,7 +640,7 @@ def start_compression_worker(
clp_config.redis.compression_backend_database,
num_cpus,
mounts,
compression_worker_mount,
compression_worker_mounts,
)


Expand All @@ -654,9 +654,9 @@ def start_query_worker(
celery_method = "job_orchestration.executor.query"
celery_route = f"{QueueName.QUERY}"

query_worker_mount = [mounts.stream_output_dir]
query_worker_mounts = [mounts.stream_output_dir]
if clp_config.archive_output.storage.type == StorageType.FS:
query_worker_mount.append(mounts.archives_output_dir)
query_worker_mounts.append(mounts.archives_output_dir)

generic_start_worker(
QUERY_WORKER_COMPONENT_NAME,
Expand All @@ -669,7 +669,7 @@ def start_query_worker(
clp_config.redis.query_backend_database,
num_cpus,
mounts,
query_worker_mount,
query_worker_mounts,
)


Expand Down Expand Up @@ -727,13 +727,13 @@ def generic_start_worker(
f"{container_clp_config.redis.host}:{container_clp_config.redis.port}/{redis_database}"
),
"-e", f"CLP_HOME={CONTAINER_CLP_HOME}",
"-e", f"WORKER_CONFIG_PATH={container_clp_config.logs_directory / container_config_filename}",
"-e", f"CLP_CONFIG_PATH={container_clp_config.logs_directory / container_config_filename}",
"-e", f"CLP_LOGS_DIR={container_logs_dir}",
"-e", f"CLP_LOGGING_LEVEL={worker_config.logging_level}",
"-u", f"{os.getuid()}:{os.getgid()}",
]

# fmt: on

necessary_mounts = [
mounts.clp_home,
mounts.data_dir,
Expand Down
41 changes: 18 additions & 23 deletions components/clp-py-utils/clp_py_utils/clp_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,35 +310,35 @@ class Queue(BaseModel):


class S3Config(BaseModel):
region_name: str
region_code: str
bucket: str
key_prefix: str

access_key_id: Optional[str] = None
secret_access_key: Optional[str] = None

@validator("region_name")
def validate_region_name(cls, field):
@validator("region_code")
def validate_region_code(cls , field):
if field == "":
raise ValueError("region_name is not provided")
raise ValueError("region_code can not be empty")
return field

@validator("bucket")
def validate_bucket(cls, field):
if field == "":
raise ValueError("bucket is not provided")
raise ValueError("bucket can not be empty")
return field

@validator("key_prefix")
def validate_key_prefix(cls, field):
if field == "":
raise ValueError("key_prefix is not provided")
raise ValueError("key_prefix can not be empty")
if not field.endswith("/"):
raise ValueError('key_prefix must end with "/"')
return field


class FSStorage(BaseModel):
class FsStorage(BaseModel):
type: Literal[StorageType.FS.value] = StorageType.FS.value
directory: pathlib.Path = pathlib.Path("var") / "data" / "archives"

Expand Down Expand Up @@ -368,12 +368,6 @@ def validate_staging_directory(cls, field):
raise ValueError("staging_directory cannot be empty")
return field

@validator("s3_config")
def validate_s3_config(cls, field):
if None == field:
raise ValueError("s3_config must be provided")
return field

def make_config_path_absolute(self, clp_home: pathlib.Path):
self.staging_directory = make_config_path_absolute(clp_home, self.staging_directory)

Expand All @@ -384,18 +378,12 @@ def dump_to_primitive_dict(self):


class ArchiveOutput(BaseModel):
storage: Union[FSStorage, S3Storage] = FSStorage()
storage: Union[FsStorage, S3Storage] = FsStorage()
target_archive_size: int = 256 * 1024 * 1024 # 256 MB
target_dictionaries_size: int = 32 * 1024 * 1024 # 32 MB
target_encoded_file_size: int = 256 * 1024 * 1024 # 256 MB
target_segment_size: int = 256 * 1024 * 1024 # 256 MB

@validator("storage")
def validate_storage(cls, field) -> bool:
if None == field:
raise ValueError("storage must be provided")
return field

@validator("target_archive_size")
def validate_target_archive_size(cls, field):
if field <= 0:
Expand All @@ -422,16 +410,23 @@ def validate_target_segment_size(cls, field):

def set_directory(self, directory: pathlib.Path):
storage_config = self.storage
if StorageType.FS == storage_config.type:
storage_type = storage_config.type
if StorageType.FS == storage_type:
storage_config.directory = directory
else:
elif StorageType.S3 == storage_type:
storage_config.staging_directory = directory
else:
raise NotImplementedError(f"storage.type {storage_type} is not supported")

def get_directory(self) -> pathlib.Path:
storage_config = self.storage
storage_type = storage_config.type
if StorageType.FS == storage_config.type:
return storage_config.directory
return storage_config.staging_directory
elif StorageType.S3 == storage_type:
return storage_config.staging_directory
else:
raise NotImplementedError(f"storage.type {storage_type} is not supported")

def dump_to_primitive_dict(self):
d = self.dict()
Expand Down
2 changes: 1 addition & 1 deletion components/clp-py-utils/clp_py_utils/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def s3_put(
return Result(success=False, error=f"{src_file} is not a file")

s3_client_args = {
"region_name": s3_config.region_name,
"region_name": s3_config.region_code,
"aws_access_key_id": s3_config.access_key_id,
"aws_secret_access_key": s3_config.secret_access_key,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,17 @@ def update_job_metadata_and_tags(db_cursor, job_id, table_prefix, tag_ids, archi


def upload_single_file_archive_to_s3(
archive_stats: Dict[str, Any],
archive_dir: pathlib.Path,
archive_id: str,
src_archive_file: pathlib.Path,
s3_config: S3Config,
) -> Result:
archive_id = archive_stats["id"]

logger.info(f"Uploading archive {archive_id} to S3...")
src_file = archive_dir / archive_id
result = s3_put(s3_config, src_file, archive_id)
result = s3_put(s3_config, src_archive_file, archive_id)
if not result.success:
logger.error(f"Failed to upload archive {archive_id}: {result.error}")
return result

logger.info(f"Finished uploading archive {archive_id} to S3.")
src_file.unlink()
return Result(success=True)


Expand Down Expand Up @@ -199,7 +195,7 @@ def run_clp(
# Get s3 config
s3_config: S3Config
enable_s3_write = False
s3_write_failed = False

storage_type = worker_config.archive_output.storage.type
if StorageType.S3 == storage_type:
if StorageEngine.CLP == clp_storage_engine:
Expand Down Expand Up @@ -258,10 +254,11 @@ def run_clp(
# Compute the total amount of data compressed
last_archive_stats = None
last_line_decoded = False
worker_output = {
"total_uncompressed_size": 0,
"total_compressed_size": 0,
}
total_uncompressed_size = 0
total_compressed_size = 0

# Handle job metadata update and s3 write if enabled
s3_error_msg = None
while not last_line_decoded:
line = proc.stdout.readline()
stats: Optional[Dict[str, Any]] = None
Expand All @@ -274,18 +271,21 @@ def run_clp(
None is stats or stats["id"] != last_archive_stats["id"]
):
if enable_s3_write:
archive_id = last_archive_stats["id"]
src_archive_file = archive_output_dir / archive_id

result = upload_single_file_archive_to_s3(
last_archive_stats, archive_output_dir, s3_config
archive_id, src_archive_file, s3_config
)
if not result.success:
worker_output["error_message"] = result.error
s3_write_failed = True
s3_error_msg = result.error
break
src_archive_file.unlink()

# We've started a new archive so add the previous archive's last
# reported size to the total
worker_output["total_uncompressed_size"] += last_archive_stats["uncompressed_size"]
worker_output["total_compressed_size"] += last_archive_stats["size"]
total_uncompressed_size += last_archive_stats["uncompressed_size"]
total_compressed_size += last_archive_stats["size"]
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
db_conn.cursor(dictionary=True)
) as db_cursor:
Expand Down Expand Up @@ -316,8 +316,14 @@ def run_clp(
# Close stderr log file
stderr_log_file.close()

if s3_write_failed:
logger.error(f"Failed to upload to S3.")
worker_output = {
"total_uncompressed_size": total_uncompressed_size,
"total_compressed_size": total_compressed_size
}

# TODO: think about how to deal with error messages
if s3_error_msg is not None:
worker_output["error_message"] = s3_error_msg
return CompressionTaskStatus.FAILED, worker_output
if compression_successful:
return CompressionTaskStatus.SUCCEEDED, worker_output
Expand Down Expand Up @@ -346,7 +352,7 @@ def compress(
# Load configuration
try:
worker_config = WorkerConfig.parse_obj(
read_yaml_config_file(pathlib.Path(os.getenv("WORKER_CONFIG_PATH")))
read_yaml_config_file(pathlib.Path(os.getenv("CLP_CONFIG_PATH")))
)
except Exception as ex:
error_msg = "Failed to load worker config"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
report_task_failure,
run_query_task,
)
from job_orchestration.executor.utils import try_load_worker_config
from job_orchestration.executor.utils import load_worker_config
from job_orchestration.scheduler.job_config import ExtractIrJobConfig, ExtractJsonJobConfig
from job_orchestration.scheduler.scheduler_data import QueryTaskStatus

Expand Down Expand Up @@ -101,7 +101,8 @@ def extract_stream(
sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params))

# Load configuration
worker_config = try_load_worker_config(os.getenv("WORKER_CONFIG_PATH"), logger)
clp_config_path = Path(os.getenv("CLP_CONFIG_PATH"))
worker_config = load_worker_config(clp_config_path, logger)
if worker_config is None:
return report_task_failure(
sql_adapter=sql_adapter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
report_task_failure,
run_query_task,
)
from job_orchestration.executor.utils import try_load_worker_config
from job_orchestration.executor.utils import load_worker_config
from job_orchestration.scheduler.job_config import SearchJobConfig
from job_orchestration.scheduler.scheduler_data import QueryTaskStatus

Expand Down Expand Up @@ -120,7 +120,8 @@ def search(
sql_adapter = SQL_Adapter(Database.parse_obj(clp_metadata_db_conn_params))

# Load configuration
worker_config = try_load_worker_config(os.getenv("WORKER_CONFIG_PATH"), logger)
clp_config_path = Path(os.getenv("CLP_CONFIG_PATH"))
worker_config = load_worker_config(clp_config_path, logger)
if worker_config is None:
return report_task_failure(
sql_adapter=sql_adapter,
Expand Down
10 changes: 3 additions & 7 deletions components/job-orchestration/job_orchestration/executor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,12 @@
from clp_py_utils.core import read_yaml_config_file


def try_load_worker_config(
config_path_str: str,
def load_worker_config(
config_path: Path,
logger: Logger,
) -> Optional[WorkerConfig]:
if config_path_str is None:
logger.error("config_path_str can't be empty")
return None

try:
return WorkerConfig.parse_obj(read_yaml_config_file(Path(config_path_str)))
return WorkerConfig.parse_obj(read_yaml_config_file(config_path))
except Exception:
logger.exception("Failed to load worker config")
return None
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def update_compression_task_metadata(db_cursor, task_id, kv):
SET {", ".join(field_set_expressions)}
WHERE id = %s
"""
values = [v for v in kv.values()] + [task_id]
values = list(kv.values()) + [task_id]
db_cursor.execute(query, values)


Expand All @@ -74,7 +74,7 @@ def update_compression_job_metadata(db_cursor, job_id, kv):
SET {", ".join(field_set_expressions)}
WHERE id = %s
"""
values = [v for v in kv.values()] + [job_id]
values = list(kv.values()) + [job_id]
db_cursor.execute(query, values)


Expand Down

0 comments on commit b2ccc22

Please sign in to comment.