Skip to content

Commit

Permalink
failed-jobs/tar-inputs-for-debugging (#63)
Browse files Browse the repository at this point in the history
* added testcov

* tar inputs for debugging if job fails in caldp
  • Loading branch information
alphasentaurii authored Apr 19, 2021
1 parent d5cda43 commit 23dcfef
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 22 deletions.
4 changes: 2 additions & 2 deletions caldp/create_previews.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ def main(ipppssoot, input_uri_prefix, output_uri_prefix):
preview_output = process.get_output_path("file:outputs", ipppssoot) + "/previews"
os.makedirs(preview_output, exist_ok=True)
copy_previews(previews, preview_output)
log.info("Uploading previews...")
file_ops.tar_outputs(ipppssoot, output_uri_prefix)
log.info("Preparing files for s3 upload...")
file_ops.tar_outputs(ipppssoot, input_uri_prefix, output_uri_prefix)
elif output_uri_prefix.startswith("file"):
preview_output = process.get_output_path(output_uri_prefix, ipppssoot) + "/previews"
os.makedirs(preview_output, exist_ok=True)
Expand Down
42 changes: 35 additions & 7 deletions caldp/file_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def get_input_path(input_uri, ipppssoot, make=False):
return input_path


def append_trailer(input_path, output_path, ipppssoot): # pragma: no cover
def append_trailer(output_path, ipppssoot): # pragma: no cover
"""Fetch process log and append to trailer file
Note: copies trailer file from inputs directory
and copies to outputs directory prior to appending log
Expand Down Expand Up @@ -58,13 +58,32 @@ def get_output_dir(output_uri):
return output_dir


def find_files(ipppssoot):
def get_input_dir(input_uri):
if input_uri.startswith("file"):
input_dir = input_uri.split(":")[-1]
else:
input_dir = os.path.join(os.getcwd(), "inputs")
return input_dir


def find_output_files(ipppssoot):
search_fits = f"{ipppssoot}/*.fits"
search_tra = f"{ipppssoot}/*.tra"
output_files = list(glob.glob(search_fits))
output_files.extend(list(glob.glob(search_tra)))
return output_files


def find_previews(ipppssoot, output_files):
search_prev = f"{ipppssoot}/previews/*"
file_list = list(glob.glob(search_fits))
file_list.extend(list(glob.glob(search_tra)))
file_list.extend(list(glob.glob(search_prev)))
output_files.extend(list(glob.glob(search_prev)))
return output_files


def find_input_files(ipppssoot):
"""If job fails (no outputs), tar the input files instead for debugging purposes."""
search_inputs = f"{ipppssoot}/*"
file_list = list(glob.glob(search_inputs))
return file_list


Expand All @@ -75,6 +94,7 @@ def make_tar(file_list, ipppssoot):
os.remove(tar) # clean up from prev attempts
with tarfile.open(tar, "x:gz") as t:
for f in file_list:
print(os.path.basename(f))
t.add(f)
log.info("Tar successful: ", tar)
tar_dest = os.path.join(ipppssoot, tar)
Expand Down Expand Up @@ -128,12 +148,20 @@ def clean_up(file_list, ipppssoot, dirs=None):
print("Done.")


def tar_outputs(ipppssoot, output_uri):
def tar_outputs(ipppssoot, input_uri, output_uri):
working_dir = os.getcwd()
output_path = process.get_output_path(output_uri, ipppssoot)
output_dir = get_output_dir(output_uri)
os.chdir(output_dir) # create tarfile with ipst/*fits (ipst is parent dir)
file_list = find_files(ipppssoot)
output_files = find_output_files(ipppssoot)
if len(output_files) == 0:
log.info("No output files found. Tarring inputs for debugging.")
os.chdir(working_dir)
input_dir = get_input_dir(input_uri)
os.chdir(input_dir)
file_list = find_input_files(ipppssoot)
else:
file_list = find_previews(ipppssoot, output_files)
tar = make_tar(file_list, ipppssoot)
upload_tar(tar, output_path)
clean_up(file_list, ipppssoot, dirs=["previews", "env"])
Expand Down
81 changes: 68 additions & 13 deletions caldp/tests/test_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,6 @@
""",
),
]
# 54720 j8f54obeq_spt.fits
# 54720 outputs/j8f54obeq/j8f54obeq_spt.fits
# 100800 ibc604b9q_spt.fits
# 100800 outputs/ibc604b9q/ibc604b9q_spt.fits

TARFILES = [
("j8cb010b0", "32586581 j8cb010b0.tar.gz"),
Expand Down Expand Up @@ -327,17 +323,28 @@
),
]

FAIL_OUTPUTS = [
(
"j8f54obeq",
"""
6445230 j8f54obeq.tar.gz
1136 preview.txt
113 preview_metrics.txt
4557 process.txt
112 process_metrics.txt
""",
),
]

SHORT_TEST_IPPPSSOOTS = [result[0] for result in RESULTS][:1]
LONG_TEST_IPPPSSOOTS = [result[0] for result in RESULTS][:-1] # [1:]
ENV_TEST_IPPPSSOOTS = [result[0] for result in RESULTS][-1:]

# LONG_TEST_IPPPSSOOTS += SHORT_TEST_IPPPSSOOTS # Include all for creating test cases.


# Leave S3 config undefined to skip S3 tests
CALDP_S3_TEST_OUTPUTS = os.environ.get("CALDP_S3_TEST_OUTPUTS") # s3://calcloud-hst-test-outputs/test-batch
CALDP_S3_TEST_INPUTS = os.environ.get("CALDP_S3_TEST_INPUTS")
CALDP_S3_TEST_OUTPUTS = os.environ.get("CALDP_S3_TEST_OUTPUTS") # s3://caldp-output-test/pytest/outputs
CALDP_S3_TEST_INPUTS = os.environ.get("CALDP_S3_TEST_INPUTS") # s3://caldp-output-test/inputs

# Output sizes must be within +- this fraction of truth value
CALDP_TEST_FILE_SIZE_THRESHOLD = float(os.environ.get("CALDP_TEST_FILE_SIZE_THRESHOLD", 0.4))
Expand Down Expand Up @@ -399,13 +406,15 @@ def coretst(temp_dir, ipppssoot, input_uri, output_uri):
check_messages(ipppssoot, output_uri, status="processed.trigger")
# tests whether file_ops gracefully handles an exception type
file_ops.clean_up([], ipppssoot, dirs=["dummy_dir"])

if input_uri.startswith("file"): # create tarfile if s3 access unavailable
actual_tarfiles = check_tarball_out(ipppssoot, input_uri, output_uri)
check_tarfiles(TARFILES, actual_tarfiles, ipppssoot, output_uri)
check_pathfinder(ipppssoot)
message_status_check(input_uri, output_uri, ipppssoot)

os.remove(tarball)
# check output tarball for failed jobs
check_failed_job_tarball(ipppssoot, input_uri, output_uri)
check_messages_cleanup(ipppssoot)
if input_uri.startswith("astroquery"):
check_IO_clean_up(ipppssoot)
Expand Down Expand Up @@ -451,7 +460,7 @@ def check_tarball_out(ipppssoot, input_uri, output_uri):
in and of itself is what really needs to be tested...
meaning it should be caldp, not in the test
"""
tar, file_list = file_ops.tar_outputs(ipppssoot, output_uri)
tar, file_list = file_ops.tar_outputs(ipppssoot, input_uri, output_uri)
assert len(file_list) > 0
tarpath = os.path.join("outputs", tar)
assert os.path.exists(tarpath)
Expand All @@ -463,6 +472,41 @@ def check_tarball_out(ipppssoot, input_uri, output_uri):
return actual_tarfiles


def check_failed_job_tarball(ipppssoot, input_uri, output_uri):
"""In the case of a processing error, tar the input files and upload to s3 for debugging.
test case: iacs01t4q, astroquery:, file:outputs
Note: if caldp fails during processing, the .fits and .tra files are never copied over to /outputs folder but the (partially) processed input files are available in /inputs.
"""
if ipppssoot == "j8f54obeq" and input_uri.startswith("astroquery"):
working_dir = os.getcwd()
fail_outputs = dict(FAIL_OUTPUTS)
expected = {}
for (name, size) in parse_results(fail_outputs[ipppssoot]):
expected[name] = size
# manually search and delete output files so it's forced to use the inputs
output_dir = file_ops.get_output_dir(output_uri)
os.chdir(output_dir)
output_files = file_ops.find_output_files(ipppssoot)
# assert len(output_files) == 6
if len(output_files) > 0:
print("Removing outputs for failed job test:")
for f in output_files:
print(f)
os.remove(f)
empty_outputs = file_ops.find_output_files(ipppssoot)
print("Files remaining in outputs dir: ", len(empty_outputs))
assert len(empty_outputs) == 0
os.chdir(working_dir)
tar, file_list = file_ops.tar_outputs(ipppssoot, input_uri, output_uri)
assert len(file_list) == 7
assert os.path.exists(os.path.join("inputs", tar))
actual = list_inputs(ipppssoot, input_uri)
log_path = os.path.join("outputs", ipppssoot, "logs")
assert os.path.exists(log_path)
actual.update(list_logs(log_path))
check_outputs(output_uri, expected, actual)


def check_messages_cleanup(ipppssoot):
# logs/ipppssoot just ensures test coverage in messages.clean_up
dirs = ["messages", f"logs/{ipppssoot}"]
Expand All @@ -487,12 +531,15 @@ def check_messages_cleanup(ipppssoot):


def check_IO_clean_up(ipppssoot):
"""Test cleanup using Astroquery inputs and local outputs.
NOTE: cleanup of inputs would normally only occur if using s3
"""
messages.clean_up(ipppssoot, IO="outputs")
path_outputs = os.path.join(os.getcwd(), "outputs")
assert not os.path.isdir(path_outputs)
assert not os.path.isdir(os.path.join(os.getcwd(), "outputs"))
assert not os.path.isdir(os.path.join(os.getcwd(), "outputs", ipppssoot))
messages.clean_up(ipppssoot, IO="inputs")
path_inputs = os.path.join(os.getcwd(), "inputs")
assert not os.path.isdir(path_inputs)
assert not os.path.isdir(os.path.join(os.getcwd(), "inputs"))
assert not os.path.isdir(os.path.join(os.getcwd(), "inputs", ipppssoot))


def list_files(startpath, ipppssoot):
Expand All @@ -504,6 +551,14 @@ def list_files(startpath, ipppssoot):
return file_dict


def list_logs(logpath):
log_dict = {}
for root, _, files in os.walk(logpath):
for f in sorted(files, key=lambda f: os.path.getsize(root + os.sep + f)):
log_dict[f] = os.path.getsize(root + os.sep + f)
return log_dict


def list_objects(path):
object_dict = {}
output = pipe(f"aws s3 ls --recursive {path}")
Expand Down

0 comments on commit 23dcfef

Please sign in to comment.