Skip to content

Commit

Permalink
Refactor fetch CLI tool logic (#243)
Browse files Browse the repository at this point in the history
Refactor fetch CLI tool, sharing more functionality with the fetch
Celery task.
  • Loading branch information
mcantelon committed Mar 20, 2024
1 parent f3313fd commit b511541
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 113 deletions.
10 changes: 10 additions & 0 deletions AIPscan/Aggregator/task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,13 @@ def write_mets(http_response, package_uuid, subdir):
with open(download_file, "wb") as file:
file.write(http_response.content)
return download_file


def summarize_fetch_job_results(fetch_job):
return "aips: '{}'; sips: '{}'; dips: '{}'; deleted: '{}'; replicated: '{}'".format(
fetch_job.total_aips,
fetch_job.total_sips,
fetch_job.total_dips,
fetch_job.total_deleted_aips,
fetch_job.total_replicas,
)
118 changes: 69 additions & 49 deletions AIPscan/Aggregator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
format_api_url_with_limit_offset,
parse_package_list_file,
process_package_object,
summarize_fetch_job_results,
)
from AIPscan.extensions import celery
from AIPscan.helpers import file_sha256_hash
Expand Down Expand Up @@ -99,44 +100,6 @@ def start_mets_task(
get_mets.apply(args=args)


def parse_packages_and_load_mets(
package_list, timestamp, package_list_no, storage_service_id, fetch_job_id
):
"""Parse packages documents from the storage service and initiate
the load mets functions of AIPscan. Results are written to the
database.
"""
OBJECTS = "objects"
packages = []

for package_obj in package_list.get(OBJECTS, []):
package = process_package_object(package_obj)

packages.append(package)
handle_deletion(package)

if not package.is_undeleted_aip():
continue

start_mets_task(
package.uuid,
package.size,
package.get_relative_path(),
package.current_location,
package.origin_pipeline,
timestamp,
package_list_no,
storage_service_id,
fetch_job_id,
)
return packages


def handle_deletion(package):
if package.is_deleted():
delete_aip(package.uuid)


def delete_aip(uuid):
logger.warning("Package deleted from SS: '%s'", uuid)

Expand Down Expand Up @@ -180,11 +143,12 @@ def workflow_coordinator(
)
# Process packages and create a new worker to download and parse
# each METS separately.
package_list = parse_package_list_file(json_file_path, logger, True)
packages = parse_package_list_file(json_file_path, logger, True)

Check warning on line 146 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L146

Added line #L146 was not covered by tests

packages = parse_packages_and_load_mets(
package_list, timestamp, package_list_no, storage_service_id, fetch_job_id
packages = process_packages(

Check warning on line 148 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L148

Added line #L148 was not covered by tests
packages, storage_service_id, timestamp, package_list_no, fetch_job_id, True
)

all_packages = all_packages + packages

total_packages_count = package_lists_task.info["totalPackages"]
Expand All @@ -193,14 +157,7 @@ def workflow_coordinator(
fetch_job_id, all_packages, total_packages_count
)

summary = "aips: '{}'; sips: '{}'; dips: '{}'; deleted: '{}'; replicated: '{}'".format(
obj.total_aips,
obj.total_sips,
obj.total_dips,
obj.total_deleted_aips,
obj.total_replicas,
)
logger.info("%s", summary)
logger.info("%s", summarize_fetch_job_results(obj))

Check warning on line 160 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L160

Added line #L160 was not covered by tests


def make_request(request_url, request_url_without_api_key):
Expand Down Expand Up @@ -410,3 +367,66 @@ def delete_storage_service(storage_service_id):

db.session.delete(storage_service)
db.session.commit()


def handle_deletion(package):
if package.is_deleted():
delete_aip(package.uuid)


def process_packages(
packages,
storage_service_id,
timestamp_str,
package_list_no,
fetch_job_id,
run_as_task=False,
logger=None,
start_item=None,
end_item=None,
):
"""Parse packages documents from the storage service and initiate
the load mets functions of AIPscan. Results are written to the
database.
"""
processed_packages = []

package_count = 0
for package_obj in packages.get("objects", []):
package_count += 1

package = process_package_object(package_obj)

# Only process packages within paging window, if specified
if start_item is None or (
package_count >= start_item and package_count <= end_item
):
# Calculate current item being processed
if start_item is not None:
current_item = start_item + len(processed_packages)

if logger:
logger.info(
f"Processing {package.uuid} ({current_item} of {end_item})"
)

processed_packages.append(package)
handle_deletion(package)

if not package.is_undeleted_aip():
continue

Check warning on line 417 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L417

Added line #L417 was not covered by tests

start_mets_task(
package.uuid,
package.size,
package.get_relative_path(),
package.current_location,
package.origin_pipeline,
timestamp_str,
package_list_no,
storage_service_id,
fetch_job_id,
run_as_task,
)

return processed_packages
20 changes: 20 additions & 0 deletions AIPscan/Aggregator/tests/test_task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,23 @@ def test_process_package_object(packages, idx, storage_service_package):
"""
package_obj = task_helpers.process_package_object(packages[idx])
assert package_obj == storage_service_package, idx


def test_summarize_fetch_job_results():
fetch_job = models.FetchJob(
total_packages=15,
total_aips=1,
total_deleted_aips=4,
download_start=None,
download_end=None,
download_directory=None,
storage_service_id=None,
)
fetch_job.total_sips = 2
fetch_job.total_dips = 3
fetch_job.total_replicas = 5

assert (
"aips: '1'; sips: '2'; dips: '3'; deleted: '4'; replicated: '5'"
== task_helpers.summarize_fetch_job_results(fetch_job)
)
79 changes: 76 additions & 3 deletions AIPscan/Aggregator/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
delete_fetch_job,
delete_storage_service,
get_mets,
handle_deletion,
make_request,
parse_package_list_file,
parse_packages_and_load_mets,
process_packages,
)
from AIPscan.Aggregator.tests import (
INVALID_JSON,
Expand All @@ -26,6 +27,7 @@
VALID_JSON,
MockResponse,
)
from AIPscan.Aggregator.types import StorageServicePackage
from AIPscan.models import AIP, Agent, FetchJob, StorageService

SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
Expand Down Expand Up @@ -248,7 +250,7 @@ def test_parse_package_list_file(tmpdir):
assert len(package_list) == 0


def test_parse_packages_and_load_mets(app_instance, tmpdir, mocker):
def test_process_packages_json_file_deletion(app_instance, tmpdir, mocker):
"""Test that JSON package lists are deleted after being parsed."""
json_file_path = tmpdir.join("packages.json")
json_file_path.write(json.dumps({"objects": []}))
Expand All @@ -257,21 +259,92 @@ def test_parse_packages_and_load_mets(app_instance, tmpdir, mocker):

package_list = parse_package_list_file(json_file_path, None, True)

parse_packages_and_load_mets(package_list, str(datetime.now()), 1, 1, 1)
processed_packages = process_packages(
package_list, 1, str(datetime.now()), 1, 1, True
)

delete_package_json.assert_called_with(json_file_path)


def test_process_packages(app_instance, tmpdir, mocker):
"""Test that JSON package lists are deleted after being parsed."""
package_uuid = str(uuid.uuid4())
package_data = {
"uuid": package_uuid,
"package_type": "AIP",
"current_path": str(tmpdir),
}

# Write test AIP to JSON file (from which to general a list of packages)
json_file_path = tmpdir.join("packages.json")
json_file_path.write(json.dumps({"objects": [package_data]}))

# Get test package list
package_list = parse_package_list_file(json_file_path, None, True)

# Process test package list
mocker.patch(
"AIPscan.Aggregator.database_helpers.create_or_update_storage_location"
)
mocker.patch("AIPscan.Aggregator.database_helpers.create_or_update_pipeline")

# Set up custom logger and add handler to capture output
customlogger = logging.getLogger(__name__)
customlogger.setLevel(logging.DEBUG)

log_string = StringIO()
handler = logging.StreamHandler(log_string)
customlogger.addHandler(handler)


processed_packages = process_packages(
package_list, 1, str(datetime.now()), 1, 1, False, customlogger, 0, 1
)

# Test that custom logger was used
assert (
log_string.getvalue()
== f"Processing {package_uuid} (0 of 1)\n"
)

# Make sure only one package was processed and that is was an AIP
assert len(processed_packages) == 1
assert processed_packages[0].aip is True


def test_handle_deletion(app_instance, mocker):
"""Test that delete handler handles deletion correctly."""
PACKAGE_UUID = str(uuid.uuid4())

# Make sure package deleted on storage service gets deleted locally
package = StorageServicePackage(uuid=PACKAGE_UUID, deleted=True)
mock_delete_aip = mocker.patch("AIPscan.Aggregator.tasks.delete_aip")

handle_deletion(package)

mock_delete_aip.assert_called_with(PACKAGE_UUID)

# Make sure package not deleted on storage service doesn't get deleted
package = StorageServicePackage(uuid=PACKAGE_UUID, deleted=False)
mock_delete_aip = mocker.patch("AIPscan.Aggregator.tasks.delete_aip")

handle_deletion(package)

mock_delete_aip.assert_not_called()


def test_delete_aip(app_instance):
"""Test that SS deleted AIPs gets deleted in aipscan.db."""
PACKAGE_UUID = str(uuid.uuid4())

test_helpers.create_test_aip(uuid=PACKAGE_UUID)

# Make sure test AIP exists before deletion
deleted_aip = AIP.query.filter_by(uuid=PACKAGE_UUID).first()
assert deleted_aip is not None

delete_aip(PACKAGE_UUID)

# Make sure test AIP doesn't exist after deletion
deleted_aip = AIP.query.filter_by(uuid=PACKAGE_UUID).first()
assert deleted_aip is None
24 changes: 11 additions & 13 deletions tools/fetch_aips
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ from helpers import fetch

from AIPscan import db
from AIPscan.Aggregator import database_helpers
from AIPscan.Aggregator.task_helpers import create_numbered_subdirs
from AIPscan.Aggregator.task_helpers import (
create_numbered_subdirs,
process_packages,
summarize_fetch_job_results,
)
from AIPscan.models import StorageService
from config import CONFIGS

Expand Down Expand Up @@ -126,33 +130,27 @@ def fetch_aips(logger, ss_id, session_id, page, packages_per_page, logfile):
)

# Import packages
processed_packages = fetch.import_packages(
processed_packages = process_packages(
packages,
start_item,
end_item,
ss_id,
session_id,
package_list_no,
fetch_job_id,
packages_per_page,
False,
logger,
start_item,
end_item,
)

fetch_job = database_helpers.update_fetch_job(
fetch_job_id, processed_packages, total_packages
)

summary = "aips: '{}'; sips: '{}'; dips: '{}'; deleted: '{}'; replicated: '{}'".format(
fetch_job.total_aips,
fetch_job.total_sips,
fetch_job.total_dips,
fetch_job.total_deleted_aips,
fetch_job.total_replicas,
)
logger.info("%s", summary)
logger.info(
f"Updated fetch job record {fetch_job_id} with package type counts."
)

logger.info("%s", summarize_fetch_job_results(fetch_job))
logger.info("Processing complete.")


Expand Down
Loading

0 comments on commit b511541

Please sign in to comment.