Skip to content

Commit

Permalink
Refactor fetch CLI tool logic (#243)
Browse files Browse the repository at this point in the history
Refactor fetch CLI tool, sharing more functionality with the fetch
Celery task.
  • Loading branch information
mcantelon committed May 1, 2024
1 parent 64bd9ef commit a9dc39d
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 122 deletions.
10 changes: 10 additions & 0 deletions AIPscan/Aggregator/task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,13 @@ def write_mets(http_response, package_uuid, subdir):
with open(download_file, "wb") as file:
file.write(http_response.content)
return download_file


def summarize_fetch_job_results(fetch_job):
return "aips: '{}'; sips: '{}'; dips: '{}'; deleted: '{}'; replicated: '{}'".format(
fetch_job.total_aips,
fetch_job.total_sips,
fetch_job.total_dips,
fetch_job.total_deleted_aips,
fetch_job.total_replicas,
)
120 changes: 69 additions & 51 deletions AIPscan/Aggregator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
format_api_url_with_limit_offset,
parse_package_list_file,
process_package_object,
summarize_fetch_job_results,
)
from AIPscan.extensions import celery
from AIPscan.helpers import file_sha256_hash
Expand Down Expand Up @@ -107,44 +108,6 @@ def start_mets_task(
get_mets.apply(args=args)


def parse_packages_and_load_mets(
package_list, timestamp, package_list_no, storage_service_id, fetch_job_id
):
"""Parse packages documents from the storage service and initiate
the load mets functions of AIPscan. Results are written to the
database.
"""
OBJECTS = "objects"
packages = []

for package_obj in package_list.get(OBJECTS, []):
package = process_package_object(package_obj)

packages.append(package)
handle_deletion(package)

if not package.is_undeleted_aip():
continue

start_mets_task(
package.uuid,
package.size,
package.get_relative_path(),
package.current_location,
package.origin_pipeline,
timestamp,
package_list_no,
storage_service_id,
fetch_job_id,
)
return packages


def handle_deletion(package):
if package.is_deleted():
delete_aip(package.uuid)


def delete_aip(uuid):
logger.warning("Package deleted from SS: '%s'", uuid)

Expand Down Expand Up @@ -188,11 +151,12 @@ def workflow_coordinator(
)
# Process packages and create a new worker to download and parse
# each METS separately.
package_list = parse_package_list_file(json_file_path, logger, True)
packages = parse_package_list_file(json_file_path, logger, True)

Check warning on line 154 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L154

Added line #L154 was not covered by tests

packages = parse_packages_and_load_mets(
package_list, timestamp, package_list_no, storage_service_id, fetch_job_id
packages = process_packages(

Check warning on line 156 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L156

Added line #L156 was not covered by tests
packages, storage_service_id, timestamp, package_list_no, fetch_job_id, True
)

all_packages = all_packages + packages

total_packages_count = package_lists_task.info["totalPackages"]
Expand All @@ -201,16 +165,7 @@ def workflow_coordinator(
fetch_job_id, all_packages, total_packages_count
)

summary = (
"aips: '{}'; sips: '{}'; dips: '{}'; deleted: '{}'; replicated: '{}'".format(
obj.total_aips,
obj.total_sips,
obj.total_dips,
obj.total_deleted_aips,
obj.total_replicas,
)
)
logger.info("%s", summary)
logger.info("%s", summarize_fetch_job_results(obj))

Check warning on line 168 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L168

Added line #L168 was not covered by tests


def make_request(request_url, request_url_without_api_key):
Expand Down Expand Up @@ -453,3 +408,66 @@ def delete_storage_service(storage_service_id):

db.session.delete(storage_service)
db.session.commit()


def handle_deletion(package):
if package.is_deleted():
delete_aip(package.uuid)


def process_packages(
packages,
storage_service_id,
timestamp_str,
package_list_no,
fetch_job_id,
run_as_task=False,
logger=None,
start_item=None,
end_item=None,
):
"""Parse packages documents from the storage service and initiate
the load mets functions of AIPscan. Results are written to the
database.
"""
processed_packages = []

package_count = 0
for package_obj in packages.get("objects", []):
package_count += 1

package = process_package_object(package_obj)

# Only process packages within paging window, if specified
if start_item is None or (
package_count >= start_item and package_count <= end_item
):
# Calculate current item being processed
if start_item is not None:
current_item = start_item + len(processed_packages)

if logger:
logger.info(
f"Processing {package.uuid} ({current_item} of {end_item})"
)

processed_packages.append(package)
handle_deletion(package)

if not package.is_undeleted_aip():
continue

Check warning on line 458 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L458

Added line #L458 was not covered by tests

start_mets_task(
package.uuid,
package.size,
package.get_relative_path(),
package.current_location,
package.origin_pipeline,
timestamp_str,
package_list_no,
storage_service_id,
fetch_job_id,
run_as_task,
)

return processed_packages
20 changes: 20 additions & 0 deletions AIPscan/Aggregator/tests/test_task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,23 @@ def test_process_package_object(packages, idx, storage_service_package):
"""
package_obj = task_helpers.process_package_object(packages[idx])
assert package_obj == storage_service_package, idx


def test_summarize_fetch_job_results():
fetch_job = models.FetchJob(
total_packages=15,
total_aips=1,
total_deleted_aips=4,
download_start=None,
download_end=None,
download_directory=None,
storage_service_id=None,
)
fetch_job.total_sips = 2
fetch_job.total_dips = 3
fetch_job.total_replicas = 5

assert (
"aips: '1'; sips: '2'; dips: '3'; deleted: '4'; replicated: '5'"
== task_helpers.summarize_fetch_job_results(fetch_job)
)
88 changes: 78 additions & 10 deletions AIPscan/Aggregator/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import os
import uuid
from datetime import datetime
from io import StringIO

import celery
import pytest
Expand All @@ -15,10 +14,11 @@
delete_fetch_job,
delete_storage_service,
get_mets,
handle_deletion,
index_task,
make_request,
parse_package_list_file,
parse_packages_and_load_mets,
process_packages,
start_index_task,
)
from AIPscan.Aggregator.tests import (
Expand All @@ -29,6 +29,7 @@
VALID_JSON,
MockResponse,
)
from AIPscan.Aggregator.types import StorageServicePackage
from AIPscan.models import AIP, Agent, FetchJob, StorageService, index_tasks

SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
Expand Down Expand Up @@ -88,11 +89,7 @@ def mock_download_mets(

# Set up custom logger and add handler to capture output
customlogger = logging.getLogger(__name__)
customlogger.setLevel(logging.DEBUG)

log_string = StringIO()
handler = logging.StreamHandler(log_string)
customlogger.addHandler(handler)
log_stream = test_helpers.add_logger_streamer(customlogger)

# Create AIP and verify record.
fetch_job1 = test_helpers.create_test_fetch_job(
Expand Down Expand Up @@ -176,7 +173,7 @@ def mock_download_mets(

# Test that custom logger was used
assert (
log_string.getvalue()
log_stream.getvalue()
== f"Processing METS file {os.path.basename(fixture_path)}\n"
)

Expand Down Expand Up @@ -260,7 +257,7 @@ def test_parse_package_list_file(tmpdir):
assert len(package_list) == 0


def test_parse_packages_and_load_mets(app_instance, tmpdir, mocker):
def test_process_packages_json_file_deletion(app_instance, tmpdir, mocker):
"""Test that JSON package lists are deleted after being parsed."""
json_file_path = tmpdir.join("packages.json")
json_file_path.write(json.dumps({"objects": []}))
Expand All @@ -269,22 +266,93 @@ def test_parse_packages_and_load_mets(app_instance, tmpdir, mocker):

package_list = parse_package_list_file(json_file_path, None, True)

parse_packages_and_load_mets(package_list, str(datetime.now()), 1, 1, 1)
process_packages(package_list, 1, str(datetime.now()), 1, 1, True)

delete_package_json.assert_called_with(json_file_path)


def test_process_packages(app_instance, tmpdir, mocker):
"""Test that JSON package lists are deleted after being parsed."""
aip_package_uuid = str(uuid.uuid4())
aip_package_data = {
"uuid": aip_package_uuid,
"package_type": "AIP",
"current_path": str(tmpdir),
}

deleted_sip_package_uuid = str(uuid.uuid4())
deleted_sip_package_data = {
"uuid": deleted_sip_package_uuid,
"package_type": "SIP",
"current_path": str(tmpdir),
"deleted": True,
}

# Write test AIP to JSON file (from which to general a list of packages)
json_file_path = tmpdir.join("packages.json")
json_file_path.write(
json.dumps({"objects": [aip_package_data, deleted_sip_package_data]})
)

# Get test package list
package_list = parse_package_list_file(json_file_path, None, True)

# Process test package list
mocker.patch(
"AIPscan.Aggregator.database_helpers.create_or_update_storage_location"
)
mocker.patch("AIPscan.Aggregator.database_helpers.create_or_update_pipeline")

# Set up custom logger and add handler to capture output
customlogger = logging.getLogger(__name__)
log_stream = test_helpers.add_logger_streamer(customlogger)

processed_packages = process_packages(
package_list, 1, str(datetime.now()), 1, 1, False, customlogger, 1, 1
)

# Test that custom logger was used
assert log_stream.getvalue() == f"Processing {aip_package_uuid} (1 of 1)\n"

# Make sure only one package was processed and that is was the non-deleted AIP
assert len(processed_packages) == 1
assert processed_packages[0].aip is True


def test_handle_deletion(app_instance, mocker):
"""Test that delete handler handles deletion correctly."""
PACKAGE_UUID = str(uuid.uuid4())

# Make sure package deleted on storage service gets deleted locally
package = StorageServicePackage(uuid=PACKAGE_UUID, deleted=True)
mock_delete_aip = mocker.patch("AIPscan.Aggregator.tasks.delete_aip")

handle_deletion(package)

mock_delete_aip.assert_called_with(PACKAGE_UUID)

# Make sure package not deleted on storage service doesn't get deleted
package = StorageServicePackage(uuid=PACKAGE_UUID, deleted=False)
mock_delete_aip = mocker.patch("AIPscan.Aggregator.tasks.delete_aip")

handle_deletion(package)

mock_delete_aip.assert_not_called()


def test_delete_aip(app_instance):
"""Test that SS deleted AIPs gets deleted in aipscan.db."""
PACKAGE_UUID = str(uuid.uuid4())

test_helpers.create_test_aip(uuid=PACKAGE_UUID)

# Make sure test AIP exists before deletion
deleted_aip = AIP.query.filter_by(uuid=PACKAGE_UUID).first()
assert deleted_aip is not None

delete_aip(PACKAGE_UUID)

# Make sure test AIP doesn't exist after deletion
deleted_aip = AIP.query.filter_by(uuid=PACKAGE_UUID).first()
assert deleted_aip is None

Expand Down
13 changes: 13 additions & 0 deletions AIPscan/test_helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
import os
import time
import uuid
from datetime import datetime
from io import StringIO

import tzlocal

Expand Down Expand Up @@ -177,3 +179,14 @@ def set_timezone_and_return_current_timezone(new_tz):
set_timezone(new_tz)

return local_timezone


def add_logger_streamer(logger):
"""Add stream handler to logger and return it."""
logger.setLevel(logging.DEBUG)

log_streamer = StringIO()
handler = logging.StreamHandler(log_streamer)
logger.addHandler(handler)

return log_streamer
Loading

0 comments on commit a9dc39d

Please sign in to comment.