Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
haiqi96 committed Dec 16, 2024
1 parent 4819f76 commit e5f43fb
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 25 deletions.
2 changes: 1 addition & 1 deletion components/clp-py-utils/clp_py_utils/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from result import Err, Ok, Result

from clp_py_utils.clp_config import S3Config
from result import Ok, Err, Result


def s3_put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,6 @@ def update_job_metadata_and_tags(db_cursor, job_id, table_prefix, tag_ids, archi
)


def upload_single_file_archive_to_s3(
archive_id: str,
src_archive_file: pathlib.Path,
s3_config: S3Config,
) -> Result[bool, str]:
logger.info(f"Uploading archive {archive_id} to S3...")
result = s3_put(s3_config, src_archive_file, archive_id)
if result.is_err():
logger.error(f"Failed to upload archive {archive_id}: {result.err_value}")
return result

logger.info(f"Finished uploading archive {archive_id} to S3.")
return result


def make_clp_command(
clp_home: pathlib.Path,
archive_output_dir: pathlib.Path,
Expand Down Expand Up @@ -257,11 +242,8 @@ def run_clp(
total_uncompressed_size = 0
total_compressed_size = 0

# Variables to track s3 write status.
s3_error_msg = None
s3_write_failed = False

# Handle job metadata update and s3 write if enabled
s3_error = None
while not last_line_decoded:
line = proc.stdout.readline()
stats: Optional[Dict[str, Any]] = None
Expand All @@ -277,11 +259,15 @@ def run_clp(
archive_id = last_archive_stats["id"]
src_archive_file = archive_output_dir / archive_id

result = upload_single_file_archive_to_s3(archive_id, src_archive_file, s3_config)
logger.info(f"Uploading archive {archive_id} to S3...")
result = s3_put(s3_config, src_archive_file, archive_id)

if result.is_err():
s3_write_failed = True
s3_error_msg = result.err_value
logger.error(f"Failed to upload archive {archive_id}: {result.err_value}")
s3_error = result.err_value
break

logger.info(f"Finished uploading archive {archive_id} to S3.")
src_archive_file.unlink()

# We've started a new archive so add the previous archive's last
Expand Down Expand Up @@ -324,8 +310,8 @@ def run_clp(
}

# TODO: think about how to deal with error messages
if s3_write_failed:
worker_output["error_message"] = s3_error_msg
if s3_error is not None:
worker_output["error_message"] = s3_error
return CompressionTaskStatus.FAILED, worker_output
if compression_successful:
return CompressionTaskStatus.SUCCEEDED, worker_output
Expand Down

0 comments on commit e5f43fb

Please sign in to comment.