From e5f43fbf58ca5a42513287ce0988d0d318654a1d Mon Sep 17 00:00:00 2001 From: Haiqi Xu <14502009+haiqi96@users.noreply.github.com> Date: Mon, 16 Dec 2024 17:04:39 -0500 Subject: [PATCH] fixes --- .../clp-py-utils/clp_py_utils/s3_utils.py | 2 +- .../executor/compress/fs_compression_task.py | 34 ++++++------------- 2 files changed, 11 insertions(+), 25 deletions(-) diff --git a/components/clp-py-utils/clp_py_utils/s3_utils.py b/components/clp-py-utils/clp_py_utils/s3_utils.py index ed77d9a4a..cb98d24ab 100644 --- a/components/clp-py-utils/clp_py_utils/s3_utils.py +++ b/components/clp-py-utils/clp_py_utils/s3_utils.py @@ -3,9 +3,9 @@ import boto3 from botocore.config import Config from botocore.exceptions import ClientError +from result import Err, Ok, Result from clp_py_utils.clp_config import S3Config -from result import Ok, Err, Result def s3_put( diff --git a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py index 0678d05ea..f2b4a2268 100644 --- a/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py +++ b/components/job-orchestration/job_orchestration/executor/compress/fs_compression_task.py @@ -78,21 +78,6 @@ def update_job_metadata_and_tags(db_cursor, job_id, table_prefix, tag_ids, archi ) -def upload_single_file_archive_to_s3( - archive_id: str, - src_archive_file: pathlib.Path, - s3_config: S3Config, -) -> Result[bool, str]: - logger.info(f"Uploading archive {archive_id} to S3...") - result = s3_put(s3_config, src_archive_file, archive_id) - if result.is_err(): - logger.error(f"Failed to upload archive {archive_id}: {result.err_value}") - return result - - logger.info(f"Finished uploading archive {archive_id} to S3.") - return result - - def make_clp_command( clp_home: pathlib.Path, archive_output_dir: pathlib.Path, @@ -257,11 +242,8 @@ def run_clp( total_uncompressed_size = 0 total_compressed_size = 0 - # Variables to track s3 write status. - s3_error_msg = None - s3_write_failed = False - # Handle job metadata update and s3 write if enabled + s3_error = None while not last_line_decoded: line = proc.stdout.readline() stats: Optional[Dict[str, Any]] = None @@ -277,11 +259,15 @@ def run_clp( archive_id = last_archive_stats["id"] src_archive_file = archive_output_dir / archive_id - result = upload_single_file_archive_to_s3(archive_id, src_archive_file, s3_config) + logger.info(f"Uploading archive {archive_id} to S3...") + result = s3_put(s3_config, src_archive_file, archive_id) + if result.is_err(): - s3_write_failed = True - s3_error_msg = result.err_value + logger.error(f"Failed to upload archive {archive_id}: {result.err_value}") + s3_error = result.err_value break + + logger.info(f"Finished uploading archive {archive_id} to S3.") src_archive_file.unlink() # We've started a new archive so add the previous archive's last @@ -324,8 +310,8 @@ def run_clp( } # TODO: think about how to deal with error messages - if s3_write_failed: - worker_output["error_message"] = s3_error_msg + if s3_error is not None: + worker_output["error_message"] = s3_error return CompressionTaskStatus.FAILED, worker_output if compression_successful: return CompressionTaskStatus.SUCCEEDED, worker_output