Skip to content

Commit

Permalink
Code cleanup/refactoring (#243)
Browse files Browse the repository at this point in the history
* Replaced use of api_url dict with StorageService instance or ID
* Did some refactoring to unify the AIP fetching code between the main
  app logic and the CLI tool
  • Loading branch information
mcantelon committed Dec 13, 2023
1 parent aa95805 commit 3ce55cc
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 210 deletions.
12 changes: 7 additions & 5 deletions AIPscan/Aggregator/database_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,19 +203,20 @@ def create_storage_location_object(current_location, description, storage_servic
return storage_location


def create_or_update_storage_location(current_location, api_url, storage_service_id):
def create_or_update_storage_location(current_location, storage_service):
"""Create or update Storage Location and return it."""
storage_location = StorageLocation.query.filter_by(
current_location=current_location
).first()

request_url, request_url_without_api_key = get_storage_service_api_url(
api_url, current_location
storage_service, current_location
)
response = tasks.make_request(request_url, request_url_without_api_key)
description = response.get("description")
if not storage_location:
return create_storage_location_object(
current_location, description, storage_service_id
current_location, description, storage_service.id
)

if storage_location.description != description:
Expand All @@ -233,11 +234,12 @@ def create_pipeline_object(origin_pipeline, dashboard_url):
return pipeline


def create_or_update_pipeline(origin_pipeline, api_url):
def create_or_update_pipeline(origin_pipeline, storage_service):
"""Create or update Storage Location and return it."""
pipeline = Pipeline.query.filter_by(origin_pipeline=origin_pipeline).first()

request_url, request_url_without_api_key = get_storage_service_api_url(
api_url, origin_pipeline
storage_service, origin_pipeline
)
response = tasks.make_request(request_url, request_url_without_api_key)
dashboard_url = response.get("remote_name")
Expand Down
6 changes: 3 additions & 3 deletions AIPscan/Aggregator/mets_parse_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def get_aip_original_name(mets):
# ignore those.
TRANSFER_DIR_PREFIX = "%transferDirectory%"

NAMESPACES = {u"premis": u"http://www.loc.gov/premis/v3"}
NAMESPACES = {"premis": "http://www.loc.gov/premis/v3"}
ELEM_ORIGINAL_NAME_PATTERN = ".//premis:originalName"

original_name = ""
Expand All @@ -85,13 +85,13 @@ def get_aip_original_name(mets):


def download_mets(
api_url, package_uuid, relative_path_to_mets, timestamp, package_list_no
storage_service, package_uuid, relative_path_to_mets, timestamp, package_list_no
):
"""Download METS from the storage service."""

# Request the METS file.
mets_response = requests.get(
get_mets_url(api_url, package_uuid, relative_path_to_mets)
get_mets_url(storage_service, package_uuid, relative_path_to_mets)
)

# Create a directory to download the METS to.
Expand Down
48 changes: 27 additions & 21 deletions AIPscan/Aggregator/task_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""Collects a number of reusable components of tasks.py. Also ensures
the module remains clean and easy to refactor over time.
"""
import json
import os
from datetime import datetime

Expand All @@ -11,20 +12,17 @@
from AIPscan.Aggregator.types import StorageServicePackage


def format_api_url_with_limit_offset(api_url):
def format_api_url_with_limit_offset(storage_service):
"""Format the API URL here to make sure it is as correct as
possible.
"""
base_url = api_url.get("baseUrl", "").rstrip("/")
limit = int(api_url.get("limit", ""))
offset = api_url.get("offset", "")
user_name = api_url.get("userName")
api_key = api_url.get("apiKey", "")
base_url = storage_service.url.rstrip("/")

request_url_without_api_key = "{}/api/v2/file/?limit={}&offset={}".format(
base_url, limit, offset
base_url, storage_service.download_limit, storage_service.download_offset
)
request_url = "{}&username={}&api_key={}".format(
request_url_without_api_key, user_name, api_key
request_url_without_api_key, storage_service.user_name, storage_service.api_key
)
return base_url, request_url_without_api_key, request_url

Expand All @@ -36,6 +34,19 @@ def get_packages_directory(timestamp):
return os.path.join("AIPscan", "Aggregator", "downloads", timestamp, "packages")


def parse_package_list_file(filepath, logger=None, remove_after_parsing=False):
with open(filepath, "r") as packages_json:
package_list = json.load(packages_json)
try:
if remove_after_parsing:
os.remove(filepath)
except OSError as err:

Check warning on line 43 in AIPscan/Aggregator/task_helpers.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/task_helpers.py#L43

Added line #L43 was not covered by tests
if logger:
logger.warning("Unable to delete package JSON file: {}".format(err))

Check warning on line 45 in AIPscan/Aggregator/task_helpers.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/task_helpers.py#L45

Added line #L45 was not covered by tests

return package_list


def process_package_object(package_obj):
"""Process a package object as retrieve from the storage service
and return a StorageServicePackage type to the caller for further
Expand Down Expand Up @@ -95,32 +106,27 @@ def _tz_neutral_date(date):
return date


def get_mets_url(api_url, package_uuid, path_to_mets):
def get_mets_url(storage_service, package_uuid, path_to_mets):
"""Construct a URL from which we can download the METS files that
we are interested in.
"""
am_url = "baseUrl"
user_name = "userName"
api_key = "apiKey"

mets_url = "{}/api/v2/file/{}/extract_file/?relative_path_to_file={}&username={}&api_key={}".format(
api_url[am_url].rstrip("/"),
storage_service.url.rstrip("/"),
package_uuid,
path_to_mets,
api_url[user_name],
api_url[api_key],
storage_service.user_name,
storage_service.api_key,
)
return mets_url


def get_storage_service_api_url(api_url, api_path):
def get_storage_service_api_url(storage_service, api_path):
"""Return URL to fetch location infofrom Storage Service."""
base_url = api_url.get("baseUrl", "").rstrip("/")
base_url = storage_service.url.rstrip("/")
request_url_without_api_key = "{}{}".format(base_url, api_path).rstrip("/")
user_name = api_url.get("userName")
api_key = api_url.get("apiKey", "")

request_url = "{}?username={}&api_key={}".format(
request_url_without_api_key, user_name, api_key
request_url_without_api_key, storage_service.user_name, storage_service.api_key
)
return request_url, request_url_without_api_key

Expand Down
105 changes: 56 additions & 49 deletions AIPscan/Aggregator/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
)
from AIPscan.Aggregator.task_helpers import (
format_api_url_with_limit_offset,
parse_package_list_file,
process_package_object,
)
from AIPscan.extensions import celery
Expand Down Expand Up @@ -52,83 +53,75 @@ def start_mets_task(
relative_path_to_mets,
current_location,
origin_pipeline,
api_url,
timestamp_str,
package_list_no,
storage_service_id,
fetch_job_id,
immediate=False,
):
"""Initiate a get_mets task worker and record the event in the
celery database.
"""
storage_service = StorageService.query.get(storage_service_id)

Check warning on line 65 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L65

Added line #L65 was not covered by tests
storage_location = database_helpers.create_or_update_storage_location(
current_location, api_url, storage_service_id
current_location, storage_service
)

pipeline = database_helpers.create_or_update_pipeline(origin_pipeline, api_url)
pipeline = database_helpers.create_or_update_pipeline(

Check warning on line 70 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L70

Added line #L70 was not covered by tests
origin_pipeline, storage_service
)

# Call worker to download and parse METS File.
get_mets_task = get_mets.delay(
args = [

Check warning on line 74 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L74

Added line #L74 was not covered by tests
package_uuid,
aip_size,
relative_path_to_mets,
api_url,
timestamp_str,
package_list_no,
storage_service_id,
storage_location.id,
pipeline.id,
fetch_job_id,
)
mets_task = get_mets_tasks(
get_mets_task_id=get_mets_task.id,
workflow_coordinator_id=workflow_coordinator.request.id,
package_uuid=package_uuid,
status=None,
)
db.session.add(mets_task)
db.session.commit()
]

if immediate:
# Execute immediately
get_mets_task = get_mets.apply(args=args)

Check warning on line 88 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L88

Added line #L88 was not covered by tests
else:
# Call worker to download and parse METS File.
get_mets_task = get_mets.delay(*args)
mets_task = get_mets_tasks(

Check warning on line 92 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L91-L92

Added lines #L91 - L92 were not covered by tests
get_mets_task_id=get_mets_task.id,
workflow_coordinator_id=workflow_coordinator.request.id,
package_uuid=package_uuid,
status=None,
)
db.session.add(mets_task)
db.session.commit()

Check warning on line 99 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L98-L99

Added lines #L98 - L99 were not covered by tests


def parse_packages_and_load_mets(
json_file_path,
api_url,
timestamp,
package_list_no,
storage_service_id,
fetch_job_id,
package_list, timestamp, package_list_no, storage_service_id, fetch_job_id
):
"""Parse packages documents from the storage service and initiate
the load mets functions of AIPscan. Results are written to the
database.
"""
OBJECTS = "objects"
packages = []
with open(json_file_path, "r") as packages_json:
package_list = json.load(packages_json)

try:
os.remove(json_file_path)
except OSError as err:
logger.warning("Unable to delete package JSON file: {}".format(err))

for package_obj in package_list.get(OBJECTS, []):
package = process_package_object(package_obj)
packages.append(package)

if package.is_deleted():
delete_aip(package.uuid)
if not check_if_package_is_aip_and_handle_deletion(package):
continue

if not package.is_aip():
continue
start_mets_task(
package.uuid,
package.size,
package.get_relative_path(),
package.current_location,
package.origin_pipeline,
api_url,
timestamp,
package_list_no,
storage_service_id,
Expand All @@ -137,6 +130,14 @@ def parse_packages_and_load_mets(
return packages


def check_if_package_is_aip_and_handle_deletion(package):
if package.is_deleted():
delete_aip(package.uuid)
return False

Check warning on line 136 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L135-L136

Added lines #L135 - L136 were not covered by tests

return package.is_aip()

Check warning on line 138 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L138

Added line #L138 was not covered by tests


def delete_aip(uuid):
logger.warning("Package deleted from SS: '%s'", uuid)

Expand All @@ -149,14 +150,13 @@ def delete_aip(uuid):

@celery.task(bind=True)
def workflow_coordinator(
self, api_url, timestamp, storage_service_id, fetch_job_id, packages_directory
self, timestamp, storage_service_id, fetch_job_id, packages_directory
):

logger.info("Packages directory is: %s", packages_directory)

# Send package list request to a worker.
package_lists_task = package_lists_request.delay(
api_url, timestamp, packages_directory
storage_service_id, timestamp, packages_directory
)

write_celery_update(package_lists_task, workflow_coordinator)
Expand All @@ -181,13 +181,10 @@ def workflow_coordinator(
)
# Process packages and create a new worker to download and parse
# each METS separately.
package_list = parse_package_list_file(json_file_path, logger, True)

Check warning on line 184 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L184

Added line #L184 was not covered by tests

packages = parse_packages_and_load_mets(
json_file_path,
api_url,
timestamp,
package_list_no,
storage_service_id,
fetch_job_id,
package_list, timestamp, package_list_no, storage_service_id, fetch_job_id
)
all_packages = all_packages + packages

Expand Down Expand Up @@ -228,29 +225,34 @@ def make_request(request_url, request_url_without_api_key):


@celery.task(bind=True)
def package_lists_request(self, apiUrl, timestamp, packages_directory):
def package_lists_request(self, storage_service_id, timestamp, packages_directory):
"""Request package lists from the storage service. Package lists
will contain details of the AIPs that we want to download.
"""
META = "meta"
NEXT = "next"
LIMIT = "limit"
COUNT = "total_count"
IN_PROGRESS = "IN PROGRESS"

storage_service = StorageService.query.get(storage_service_id)

Check warning on line 237 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L237

Added line #L237 was not covered by tests

(
base_url,
request_url_without_api_key,
request_url,
) = format_api_url_with_limit_offset(apiUrl)
) = format_api_url_with_limit_offset(storage_service)

# First packages request.
packages = make_request(request_url, request_url_without_api_key)
packages_count = 1

# Calculate how many package list files will be downloaded based on
# total number of packages and the download limit
total_packages = int(packages.get(META, {}).get(COUNT, 0))
total_package_lists = int(total_packages / int(apiUrl.get(LIMIT))) + (
total_packages % int(apiUrl.get(LIMIT)) > 0
total_package_lists = int(total_packages / int(storage_service.download_limit)) + (

Check warning on line 252 in AIPscan/Aggregator/tasks.py

View check run for this annotation

Codecov / codecov/patch

AIPscan/Aggregator/tasks.py#L252

Added line #L252 was not covered by tests
total_packages % int(storage_service.download_limit) > 0
)

# There may be more packages to download to let's access those here.
# TODO: `request_url_without_api_key` at this point will not be as
# accurate. If we have more time, modify `format_api_url_with_limit_offset(...)`
Expand All @@ -272,6 +274,7 @@ def package_lists_request(self, apiUrl, timestamp, packages_directory):
)
},
)

return {
"totalPackageLists": total_package_lists,
"totalPackages": total_packages,
Expand All @@ -284,7 +287,6 @@ def get_mets(
package_uuid,
aip_size,
relative_path_to_mets,
api_url,
timestamp_str,
package_list_no,
storage_service_id,
Expand Down Expand Up @@ -313,8 +315,13 @@ def get_mets(
tasklogger = customlogger

# Download METS file
storage_service = StorageService.query.get(storage_service_id)
download_file = download_mets(
api_url, package_uuid, relative_path_to_mets, timestamp_str, package_list_no
storage_service,
package_uuid,
relative_path_to_mets,
timestamp_str,
package_list_no,
)
mets_name = os.path.basename(download_file)
mets_hash = file_sha256_hash(download_file)
Expand Down
Loading

0 comments on commit 3ce55cc

Please sign in to comment.