From b511541c17a4d36e148f32e474d929c6b38af9a8 Mon Sep 17 00:00:00 2001 From: Mike Cantelon Date: Tue, 19 Mar 2024 12:54:52 -0700 Subject: [PATCH] Refactor fetch CLI tool logic (#243) Refactor fetch CLI tool, sharing more functionality with the fetch Celery task. --- AIPscan/Aggregator/task_helpers.py | 10 ++ AIPscan/Aggregator/tasks.py | 118 ++++++++++-------- AIPscan/Aggregator/tests/test_task_helpers.py | 20 +++ AIPscan/Aggregator/tests/test_tasks.py | 79 +++++++++++- tools/fetch_aips | 24 ++-- tools/helpers/fetch.py | 49 +------- 6 files changed, 187 insertions(+), 113 deletions(-) diff --git a/AIPscan/Aggregator/task_helpers.py b/AIPscan/Aggregator/task_helpers.py index f0a6db14..3f72bc23 100644 --- a/AIPscan/Aggregator/task_helpers.py +++ b/AIPscan/Aggregator/task_helpers.py @@ -158,3 +158,13 @@ def write_mets(http_response, package_uuid, subdir): with open(download_file, "wb") as file: file.write(http_response.content) return download_file + + +def summarize_fetch_job_results(fetch_job): + return "aips: '{}'; sips: '{}'; dips: '{}'; deleted: '{}'; replicated: '{}'".format( + fetch_job.total_aips, + fetch_job.total_sips, + fetch_job.total_dips, + fetch_job.total_deleted_aips, + fetch_job.total_replicas, + ) diff --git a/AIPscan/Aggregator/tasks.py b/AIPscan/Aggregator/tasks.py index ff46ada2..e3d6897b 100644 --- a/AIPscan/Aggregator/tasks.py +++ b/AIPscan/Aggregator/tasks.py @@ -19,6 +19,7 @@ format_api_url_with_limit_offset, parse_package_list_file, process_package_object, + summarize_fetch_job_results, ) from AIPscan.extensions import celery from AIPscan.helpers import file_sha256_hash @@ -99,44 +100,6 @@ def start_mets_task( get_mets.apply(args=args) -def parse_packages_and_load_mets( - package_list, timestamp, package_list_no, storage_service_id, fetch_job_id -): - """Parse packages documents from the storage service and initiate - the load mets functions of AIPscan. Results are written to the - database. - """ - OBJECTS = "objects" - packages = [] - - for package_obj in package_list.get(OBJECTS, []): - package = process_package_object(package_obj) - - packages.append(package) - handle_deletion(package) - - if not package.is_undeleted_aip(): - continue - - start_mets_task( - package.uuid, - package.size, - package.get_relative_path(), - package.current_location, - package.origin_pipeline, - timestamp, - package_list_no, - storage_service_id, - fetch_job_id, - ) - return packages - - -def handle_deletion(package): - if package.is_deleted(): - delete_aip(package.uuid) - - def delete_aip(uuid): logger.warning("Package deleted from SS: '%s'", uuid) @@ -180,11 +143,12 @@ def workflow_coordinator( ) # Process packages and create a new worker to download and parse # each METS separately. - package_list = parse_package_list_file(json_file_path, logger, True) + packages = parse_package_list_file(json_file_path, logger, True) - packages = parse_packages_and_load_mets( - package_list, timestamp, package_list_no, storage_service_id, fetch_job_id + packages = process_packages( + packages, storage_service_id, timestamp, package_list_no, fetch_job_id, True ) + all_packages = all_packages + packages total_packages_count = package_lists_task.info["totalPackages"] @@ -193,14 +157,7 @@ def workflow_coordinator( fetch_job_id, all_packages, total_packages_count ) - summary = "aips: '{}'; sips: '{}'; dips: '{}'; deleted: '{}'; replicated: '{}'".format( - obj.total_aips, - obj.total_sips, - obj.total_dips, - obj.total_deleted_aips, - obj.total_replicas, - ) - logger.info("%s", summary) + logger.info("%s", summarize_fetch_job_results(obj)) def make_request(request_url, request_url_without_api_key): @@ -410,3 +367,66 @@ def delete_storage_service(storage_service_id): db.session.delete(storage_service) db.session.commit() + + +def handle_deletion(package): + if package.is_deleted(): + delete_aip(package.uuid) + + +def process_packages( + packages, + storage_service_id, + timestamp_str, + package_list_no, + fetch_job_id, + run_as_task=False, + logger=None, + start_item=None, + end_item=None, +): + """Parse packages documents from the storage service and initiate + the load mets functions of AIPscan. Results are written to the + database. + """ + processed_packages = [] + + package_count = 0 + for package_obj in packages.get("objects", []): + package_count += 1 + + package = process_package_object(package_obj) + + # Only process packages within paging window, if specified + if start_item is None or ( + package_count >= start_item and package_count <= end_item + ): + # Calculate current item being processed + if start_item is not None: + current_item = start_item + len(processed_packages) + + if logger: + logger.info( + f"Processing {package.uuid} ({current_item} of {end_item})" + ) + + processed_packages.append(package) + handle_deletion(package) + + if not package.is_undeleted_aip(): + continue + + start_mets_task( + package.uuid, + package.size, + package.get_relative_path(), + package.current_location, + package.origin_pipeline, + timestamp_str, + package_list_no, + storage_service_id, + fetch_job_id, + run_as_task, + ) + + return processed_packages diff --git a/AIPscan/Aggregator/tests/test_task_helpers.py b/AIPscan/Aggregator/tests/test_task_helpers.py index f85fe864..fe0ed781 100644 --- a/AIPscan/Aggregator/tests/test_task_helpers.py +++ b/AIPscan/Aggregator/tests/test_task_helpers.py @@ -305,3 +305,23 @@ def test_process_package_object(packages, idx, storage_service_package): """ package_obj = task_helpers.process_package_object(packages[idx]) assert package_obj == storage_service_package, idx + + +def test_summarize_fetch_job_results(): + fetch_job = models.FetchJob( + total_packages=15, + total_aips=1, + total_deleted_aips=4, + download_start=None, + download_end=None, + download_directory=None, + storage_service_id=None, + ) + fetch_job.total_sips = 2 + fetch_job.total_dips = 3 + fetch_job.total_replicas = 5 + + assert ( + "aips: '1'; sips: '2'; dips: '3'; deleted: '4'; replicated: '5'" + == task_helpers.summarize_fetch_job_results(fetch_job) + ) diff --git a/AIPscan/Aggregator/tests/test_tasks.py b/AIPscan/Aggregator/tests/test_tasks.py index b737aeba..6afd6b76 100644 --- a/AIPscan/Aggregator/tests/test_tasks.py +++ b/AIPscan/Aggregator/tests/test_tasks.py @@ -14,9 +14,10 @@ delete_fetch_job, delete_storage_service, get_mets, + handle_deletion, make_request, parse_package_list_file, - parse_packages_and_load_mets, + process_packages, ) from AIPscan.Aggregator.tests import ( INVALID_JSON, @@ -26,6 +27,7 @@ VALID_JSON, MockResponse, ) +from AIPscan.Aggregator.types import StorageServicePackage from AIPscan.models import AIP, Agent, FetchJob, StorageService SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -248,7 +250,7 @@ def test_parse_package_list_file(tmpdir): assert len(package_list) == 0 -def test_parse_packages_and_load_mets(app_instance, tmpdir, mocker): +def test_process_packages_json_file_deletion(app_instance, tmpdir, mocker): """Test that JSON package lists are deleted after being parsed.""" json_file_path = tmpdir.join("packages.json") json_file_path.write(json.dumps({"objects": []})) @@ -257,21 +259,92 @@ def test_parse_packages_and_load_mets(app_instance, tmpdir, mocker): package_list = parse_package_list_file(json_file_path, None, True) - parse_packages_and_load_mets(package_list, str(datetime.now()), 1, 1, 1) + processed_packages = process_packages( + package_list, 1, str(datetime.now()), 1, 1, True + ) delete_package_json.assert_called_with(json_file_path) +def test_process_packages(app_instance, tmpdir, mocker): + """Test that JSON package lists are deleted after being parsed.""" + package_uuid = str(uuid.uuid4()) + package_data = { + "uuid": package_uuid, + "package_type": "AIP", + "current_path": str(tmpdir), + } + + # Write test AIP to JSON file (from which to general a list of packages) + json_file_path = tmpdir.join("packages.json") + json_file_path.write(json.dumps({"objects": [package_data]})) + + # Get test package list + package_list = parse_package_list_file(json_file_path, None, True) + + # Process test package list + mocker.patch( + "AIPscan.Aggregator.database_helpers.create_or_update_storage_location" + ) + mocker.patch("AIPscan.Aggregator.database_helpers.create_or_update_pipeline") + + # Set up custom logger and add handler to capture output + customlogger = logging.getLogger(__name__) + customlogger.setLevel(logging.DEBUG) + + log_string = StringIO() + handler = logging.StreamHandler(log_string) + customlogger.addHandler(handler) + + + processed_packages = process_packages( + package_list, 1, str(datetime.now()), 1, 1, False, customlogger, 0, 1 + ) + + # Test that custom logger was used + assert ( + log_string.getvalue() + == f"Processing {package_uuid} (0 of 1)\n" + ) + + # Make sure only one package was processed and that is was an AIP + assert len(processed_packages) == 1 + assert processed_packages[0].aip is True + + +def test_handle_deletion(app_instance, mocker): + """Test that delete handler handles deletion correctly.""" + PACKAGE_UUID = str(uuid.uuid4()) + + # Make sure package deleted on storage service gets deleted locally + package = StorageServicePackage(uuid=PACKAGE_UUID, deleted=True) + mock_delete_aip = mocker.patch("AIPscan.Aggregator.tasks.delete_aip") + + handle_deletion(package) + + mock_delete_aip.assert_called_with(PACKAGE_UUID) + + # Make sure package not deleted on storage service doesn't get deleted + package = StorageServicePackage(uuid=PACKAGE_UUID, deleted=False) + mock_delete_aip = mocker.patch("AIPscan.Aggregator.tasks.delete_aip") + + handle_deletion(package) + + mock_delete_aip.assert_not_called() + + def test_delete_aip(app_instance): """Test that SS deleted AIPs gets deleted in aipscan.db.""" PACKAGE_UUID = str(uuid.uuid4()) test_helpers.create_test_aip(uuid=PACKAGE_UUID) + # Make sure test AIP exists before deletion deleted_aip = AIP.query.filter_by(uuid=PACKAGE_UUID).first() assert deleted_aip is not None delete_aip(PACKAGE_UUID) + # Make sure test AIP doesn't exist after deletion deleted_aip = AIP.query.filter_by(uuid=PACKAGE_UUID).first() assert deleted_aip is None diff --git a/tools/fetch_aips b/tools/fetch_aips index cf18299b..3bd8f1b2 100755 --- a/tools/fetch_aips +++ b/tools/fetch_aips @@ -11,7 +11,11 @@ from helpers import fetch from AIPscan import db from AIPscan.Aggregator import database_helpers -from AIPscan.Aggregator.task_helpers import create_numbered_subdirs +from AIPscan.Aggregator.task_helpers import ( + create_numbered_subdirs, + process_packages, + summarize_fetch_job_results, +) from AIPscan.models import StorageService from config import CONFIGS @@ -126,33 +130,27 @@ def fetch_aips(logger, ss_id, session_id, page, packages_per_page, logfile): ) # Import packages - processed_packages = fetch.import_packages( + processed_packages = process_packages( packages, - start_item, - end_item, ss_id, session_id, package_list_no, fetch_job_id, - packages_per_page, + False, logger, + start_item, + end_item, ) fetch_job = database_helpers.update_fetch_job( fetch_job_id, processed_packages, total_packages ) - summary = "aips: '{}'; sips: '{}'; dips: '{}'; deleted: '{}'; replicated: '{}'".format( - fetch_job.total_aips, - fetch_job.total_sips, - fetch_job.total_dips, - fetch_job.total_deleted_aips, - fetch_job.total_replicas, - ) - logger.info("%s", summary) logger.info( f"Updated fetch job record {fetch_job_id} with package type counts." ) + + logger.info("%s", summarize_fetch_job_results(fetch_job)) logger.info("Processing complete.") diff --git a/tools/helpers/fetch.py b/tools/helpers/fetch.py index 8cd11dbb..9dd8fb0f 100644 --- a/tools/helpers/fetch.py +++ b/tools/helpers/fetch.py @@ -5,9 +5,8 @@ format_api_url_with_limit_offset, get_packages_directory, parse_package_list_file, - process_package_object, ) -from AIPscan.Aggregator.tasks import handle_deletion, make_request, start_mets_task +from AIPscan.Aggregator.tasks import make_request def determine_start_and_end_item(page, packages_per_page, total_packages): @@ -61,49 +60,3 @@ def get_packages(storage_service, packages_dir): packages = fetch_and_write_packages(storage_service, package_filepath) return packages - - -def import_packages( - packages, - start_item, - end_item, - storage_service_id, - timestamp_str, - package_list_no, - fetch_job_id, - packages_per_page, - logger, -): - processed_packages = [] - - package_count = 0 - for package_obj in packages["objects"]: - package_count += 1 - - package = process_package_object(package_obj) - - if package_count >= start_item and package_count <= end_item: - # Calculate current item being processed - current_item = start_item + len(processed_packages) - logger.info(f"Processing {package.uuid} ({current_item} of {end_item})") - - processed_packages.append(package) - handle_deletion(package) - - if not package.is_undeleted_aip(): - continue - - start_mets_task( - package.uuid, - package.size, - package.get_relative_path(), - package.current_location, - package.origin_pipeline, - timestamp_str, - package_list_no, - storage_service_id, - fetch_job_id, - False, - ) - - return processed_packages