diff --git a/AIPscan/Aggregator/database_helpers.py b/AIPscan/Aggregator/database_helpers.py index 51545311..bee8a267 100644 --- a/AIPscan/Aggregator/database_helpers.py +++ b/AIPscan/Aggregator/database_helpers.py @@ -203,19 +203,20 @@ def create_storage_location_object(current_location, description, storage_servic return storage_location -def create_or_update_storage_location(current_location, api_url, storage_service_id): +def create_or_update_storage_location(current_location, storage_service): """Create or update Storage Location and return it.""" storage_location = StorageLocation.query.filter_by( current_location=current_location ).first() + request_url, request_url_without_api_key = get_storage_service_api_url( - api_url, current_location + storage_service, current_location ) response = tasks.make_request(request_url, request_url_without_api_key) description = response.get("description") if not storage_location: return create_storage_location_object( - current_location, description, storage_service_id + current_location, description, storage_service.id ) if storage_location.description != description: @@ -233,11 +234,12 @@ def create_pipeline_object(origin_pipeline, dashboard_url): return pipeline -def create_or_update_pipeline(origin_pipeline, api_url): +def create_or_update_pipeline(origin_pipeline, storage_service): """Create or update Storage Location and return it.""" pipeline = Pipeline.query.filter_by(origin_pipeline=origin_pipeline).first() + request_url, request_url_without_api_key = get_storage_service_api_url( - api_url, origin_pipeline + storage_service, origin_pipeline ) response = tasks.make_request(request_url, request_url_without_api_key) dashboard_url = response.get("remote_name") diff --git a/AIPscan/Aggregator/mets_parse_helpers.py b/AIPscan/Aggregator/mets_parse_helpers.py index 74219d7d..96651d79 100644 --- a/AIPscan/Aggregator/mets_parse_helpers.py +++ b/AIPscan/Aggregator/mets_parse_helpers.py @@ -58,7 +58,7 @@ def get_aip_original_name(mets): # ignore those. TRANSFER_DIR_PREFIX = "%transferDirectory%" - NAMESPACES = {u"premis": u"http://www.loc.gov/premis/v3"} + NAMESPACES = {"premis": "http://www.loc.gov/premis/v3"} ELEM_ORIGINAL_NAME_PATTERN = ".//premis:originalName" original_name = "" @@ -85,13 +85,13 @@ def get_aip_original_name(mets): def download_mets( - api_url, package_uuid, relative_path_to_mets, timestamp, package_list_no + storage_service, package_uuid, relative_path_to_mets, timestamp, package_list_no ): """Download METS from the storage service.""" # Request the METS file. mets_response = requests.get( - get_mets_url(api_url, package_uuid, relative_path_to_mets) + get_mets_url(storage_service, package_uuid, relative_path_to_mets) ) # Create a directory to download the METS to. diff --git a/AIPscan/Aggregator/task_helpers.py b/AIPscan/Aggregator/task_helpers.py index dcadb51a..f0a6db14 100644 --- a/AIPscan/Aggregator/task_helpers.py +++ b/AIPscan/Aggregator/task_helpers.py @@ -3,6 +3,7 @@ """Collects a number of reusable components of tasks.py. Also ensures the module remains clean and easy to refactor over time. """ +import json import os from datetime import datetime @@ -11,20 +12,17 @@ from AIPscan.Aggregator.types import StorageServicePackage -def format_api_url_with_limit_offset(api_url): +def format_api_url_with_limit_offset(storage_service): """Format the API URL here to make sure it is as correct as possible. """ - base_url = api_url.get("baseUrl", "").rstrip("/") - limit = int(api_url.get("limit", "")) - offset = api_url.get("offset", "") - user_name = api_url.get("userName") - api_key = api_url.get("apiKey", "") + base_url = storage_service.url.rstrip("/") + request_url_without_api_key = "{}/api/v2/file/?limit={}&offset={}".format( - base_url, limit, offset + base_url, storage_service.download_limit, storage_service.download_offset ) request_url = "{}&username={}&api_key={}".format( - request_url_without_api_key, user_name, api_key + request_url_without_api_key, storage_service.user_name, storage_service.api_key ) return base_url, request_url_without_api_key, request_url @@ -36,6 +34,19 @@ def get_packages_directory(timestamp): return os.path.join("AIPscan", "Aggregator", "downloads", timestamp, "packages") +def parse_package_list_file(filepath, logger=None, remove_after_parsing=False): + with open(filepath, "r") as packages_json: + package_list = json.load(packages_json) + try: + if remove_after_parsing: + os.remove(filepath) + except OSError as err: + if logger: + logger.warning("Unable to delete package JSON file: {}".format(err)) + + return package_list + + def process_package_object(package_obj): """Process a package object as retrieve from the storage service and return a StorageServicePackage type to the caller for further @@ -95,32 +106,27 @@ def _tz_neutral_date(date): return date -def get_mets_url(api_url, package_uuid, path_to_mets): +def get_mets_url(storage_service, package_uuid, path_to_mets): """Construct a URL from which we can download the METS files that we are interested in. """ - am_url = "baseUrl" - user_name = "userName" - api_key = "apiKey" - mets_url = "{}/api/v2/file/{}/extract_file/?relative_path_to_file={}&username={}&api_key={}".format( - api_url[am_url].rstrip("/"), + storage_service.url.rstrip("/"), package_uuid, path_to_mets, - api_url[user_name], - api_url[api_key], + storage_service.user_name, + storage_service.api_key, ) return mets_url -def get_storage_service_api_url(api_url, api_path): +def get_storage_service_api_url(storage_service, api_path): """Return URL to fetch location infofrom Storage Service.""" - base_url = api_url.get("baseUrl", "").rstrip("/") + base_url = storage_service.url.rstrip("/") request_url_without_api_key = "{}{}".format(base_url, api_path).rstrip("/") - user_name = api_url.get("userName") - api_key = api_url.get("apiKey", "") + request_url = "{}?username={}&api_key={}".format( - request_url_without_api_key, user_name, api_key + request_url_without_api_key, storage_service.user_name, storage_service.api_key ) return request_url, request_url_without_api_key diff --git a/AIPscan/Aggregator/tasks.py b/AIPscan/Aggregator/tasks.py index abfd316e..d18e0c20 100644 --- a/AIPscan/Aggregator/tasks.py +++ b/AIPscan/Aggregator/tasks.py @@ -17,6 +17,7 @@ ) from AIPscan.Aggregator.task_helpers import ( format_api_url_with_limit_offset, + parse_package_list_file, process_package_object, ) from AIPscan.extensions import celery @@ -52,51 +53,54 @@ def start_mets_task( relative_path_to_mets, current_location, origin_pipeline, - api_url, timestamp_str, package_list_no, storage_service_id, fetch_job_id, + immediate=False, ): """Initiate a get_mets task worker and record the event in the celery database. """ + storage_service = StorageService.query.get(storage_service_id) storage_location = database_helpers.create_or_update_storage_location( - current_location, api_url, storage_service_id + current_location, storage_service ) - pipeline = database_helpers.create_or_update_pipeline(origin_pipeline, api_url) + pipeline = database_helpers.create_or_update_pipeline( + origin_pipeline, storage_service + ) - # Call worker to download and parse METS File. - get_mets_task = get_mets.delay( + args = [ package_uuid, aip_size, relative_path_to_mets, - api_url, timestamp_str, package_list_no, storage_service_id, storage_location.id, pipeline.id, fetch_job_id, - ) - mets_task = get_mets_tasks( - get_mets_task_id=get_mets_task.id, - workflow_coordinator_id=workflow_coordinator.request.id, - package_uuid=package_uuid, - status=None, - ) - db.session.add(mets_task) - db.session.commit() + ] + + if immediate: + # Execute immediately. + get_mets.apply(args=args) + else: + # Call worker to download and parse METS File. + get_mets_task = get_mets.delay(*args) + mets_task = get_mets_tasks( + get_mets_task_id=get_mets_task.id, + workflow_coordinator_id=workflow_coordinator.request.id, + package_uuid=package_uuid, + status=None, + ) + db.session.add(mets_task) + db.session.commit() def parse_packages_and_load_mets( - json_file_path, - api_url, - timestamp, - package_list_no, - storage_service_id, - fetch_job_id, + package_list, timestamp, package_list_no, storage_service_id, fetch_job_id ): """Parse packages documents from the storage service and initiate the load mets functions of AIPscan. Results are written to the @@ -104,31 +108,22 @@ def parse_packages_and_load_mets( """ OBJECTS = "objects" packages = [] - with open(json_file_path, "r") as packages_json: - package_list = json.load(packages_json) - - try: - os.remove(json_file_path) - except OSError as err: - logger.warning("Unable to delete package JSON file: {}".format(err)) for package_obj in package_list.get(OBJECTS, []): package = process_package_object(package_obj) + packages.append(package) + handle_deletion(package) - if package.is_deleted(): - delete_aip(package.uuid) + if not package.is_undeleted_aip(): continue - if not package.is_aip(): - continue start_mets_task( package.uuid, package.size, package.get_relative_path(), package.current_location, package.origin_pipeline, - api_url, timestamp, package_list_no, storage_service_id, @@ -137,6 +132,11 @@ def parse_packages_and_load_mets( return packages +def handle_deletion(package): + if package.is_deleted(): + delete_aip(package.uuid) + + def delete_aip(uuid): logger.warning("Package deleted from SS: '%s'", uuid) @@ -149,14 +149,13 @@ def delete_aip(uuid): @celery.task(bind=True) def workflow_coordinator( - self, api_url, timestamp, storage_service_id, fetch_job_id, packages_directory + self, timestamp, storage_service_id, fetch_job_id, packages_directory ): - logger.info("Packages directory is: %s", packages_directory) # Send package list request to a worker. package_lists_task = package_lists_request.delay( - api_url, timestamp, packages_directory + storage_service_id, timestamp, packages_directory ) write_celery_update(package_lists_task, workflow_coordinator) @@ -181,13 +180,10 @@ def workflow_coordinator( ) # Process packages and create a new worker to download and parse # each METS separately. + package_list = parse_package_list_file(json_file_path, logger, True) + packages = parse_packages_and_load_mets( - json_file_path, - api_url, - timestamp, - package_list_no, - storage_service_id, - fetch_job_id, + package_list, timestamp, package_list_no, storage_service_id, fetch_job_id ) all_packages = all_packages + packages @@ -228,29 +224,34 @@ def make_request(request_url, request_url_without_api_key): @celery.task(bind=True) -def package_lists_request(self, apiUrl, timestamp, packages_directory): +def package_lists_request(self, storage_service_id, timestamp, packages_directory): """Request package lists from the storage service. Package lists will contain details of the AIPs that we want to download. """ META = "meta" NEXT = "next" - LIMIT = "limit" COUNT = "total_count" IN_PROGRESS = "IN PROGRESS" + + storage_service = StorageService.query.get(storage_service_id) + ( base_url, request_url_without_api_key, request_url, - ) = format_api_url_with_limit_offset(apiUrl) + ) = format_api_url_with_limit_offset(storage_service) + # First packages request. packages = make_request(request_url, request_url_without_api_key) packages_count = 1 + # Calculate how many package list files will be downloaded based on # total number of packages and the download limit total_packages = int(packages.get(META, {}).get(COUNT, 0)) - total_package_lists = int(total_packages / int(apiUrl.get(LIMIT))) + ( - total_packages % int(apiUrl.get(LIMIT)) > 0 + total_package_lists = int(total_packages / int(storage_service.download_limit)) + ( + total_packages % int(storage_service.download_limit) > 0 ) + # There may be more packages to download to let's access those here. # TODO: `request_url_without_api_key` at this point will not be as # accurate. If we have more time, modify `format_api_url_with_limit_offset(...)` @@ -272,6 +273,7 @@ def package_lists_request(self, apiUrl, timestamp, packages_directory): ) }, ) + return { "totalPackageLists": total_package_lists, "totalPackages": total_packages, @@ -284,7 +286,6 @@ def get_mets( package_uuid, aip_size, relative_path_to_mets, - api_url, timestamp_str, package_list_no, storage_service_id, @@ -313,8 +314,13 @@ def get_mets( tasklogger = customlogger # Download METS file + storage_service = StorageService.query.get(storage_service_id) download_file = download_mets( - api_url, package_uuid, relative_path_to_mets, timestamp_str, package_list_no + storage_service, + package_uuid, + relative_path_to_mets, + timestamp_str, + package_list_no, ) mets_name = os.path.basename(download_file) mets_hash = file_sha256_hash(download_file) diff --git a/AIPscan/Aggregator/tests/test_database_helpers.py b/AIPscan/Aggregator/tests/test_database_helpers.py index 06de425b..26e5aab7 100644 --- a/AIPscan/Aggregator/tests/test_database_helpers.py +++ b/AIPscan/Aggregator/tests/test_database_helpers.py @@ -16,6 +16,7 @@ FileType, Pipeline, StorageLocation, + StorageService, ) FIXTURES_DIR = "fixtures" @@ -289,14 +290,15 @@ def test_create_or_update_storage_location( """Test that Storage Locations are created or updated as expected.""" make_request = mocker.patch("AIPscan.Aggregator.tasks.make_request") make_request.return_value = {"description": new_description} + create_storage_location_object = mocker.patch( "AIPscan.Aggregator.database_helpers.create_storage_location_object" ) + storage_service = StorageService.query.get(1) + storage_location = database_helpers.create_or_update_storage_location( - current_location=current_location, - api_url={}, - storage_service_id=storage_service_id, + current_location=current_location, storage_service=storage_service ) if location_created: @@ -320,12 +322,20 @@ def test_create_or_update_pipeline( """Test that Storage Locations are created or updated as expected.""" make_request = mocker.patch("AIPscan.Aggregator.tasks.make_request") make_request.return_value = {"remote_name": new_url} + create_pipeline_object = mocker.patch( "AIPscan.Aggregator.database_helpers.create_pipeline_object" ) + get_storage_service_api_url = mocker.patch( + "AIPscan.Aggregator.database_helpers.get_storage_service_api_url" + ) + get_storage_service_api_url.return_value = (None, None) + + storage_service = StorageService.query.get(1) + pipeline = database_helpers.create_or_update_pipeline( - origin_pipeline=origin_pipeline, api_url={} + origin_pipeline=origin_pipeline, storage_service=storage_service ) if pipeline_created: diff --git a/AIPscan/Aggregator/tests/test_task_helpers.py b/AIPscan/Aggregator/tests/test_task_helpers.py index 52c6dd3a..f85fe864 100644 --- a/AIPscan/Aggregator/tests/test_task_helpers.py +++ b/AIPscan/Aggregator/tests/test_task_helpers.py @@ -7,6 +7,7 @@ import pytest +from AIPscan import models from AIPscan.Aggregator import task_helpers from AIPscan.Aggregator.types import StorageServicePackage @@ -26,8 +27,7 @@ ], ) def test_tz_neutral_dates(input_date, output_date, now_year): - """Ensure datetime values are handled sensibly across regions. - """ + """Ensure datetime values are handled sensibly across regions.""" result_date = task_helpers._tz_neutral_date(input_date) if now_year is True: year = datetime.now().strftime("%Y-%m-%d") @@ -40,15 +40,17 @@ def test_tz_neutral_dates(input_date, output_date, now_year): @pytest.mark.parametrize( - "url_api_dict, base_url, url_without_api_key, url_with_api_key", + "ss_args, base_url, url_without_api_key, url_with_api_key", [ ( { - "baseUrl": "http://example.com:9000/", - "limit": "23", - "offset": "13", - "userName": "test", - "apiKey": "mykey", + "name": "Test", + "url": "http://example.com:9000/", + "user_name": "test", + "api_key": "mykey", + "download_limit": "23", + "download_offset": "13", + "default": False, }, "http://example.com:9000", "http://example.com:9000/api/v2/file/?limit=23&offset=13", @@ -56,11 +58,13 @@ def test_tz_neutral_dates(input_date, output_date, now_year): ), ( { - "baseUrl": "http://subdomain.example.com:8000/", - "limit": "10", - "offset": "99", - "userName": "anothertest", - "apiKey": "myotherkey", + "name": "Test", + "url": "http://subdomain.example.com:8000/", + "user_name": "anothertest", + "api_key": "myotherkey", + "download_limit": "10", + "download_offset": "99", + "default": False, }, "http://subdomain.example.com:8000", "http://subdomain.example.com:8000/api/v2/file/?limit=10&offset=99", @@ -68,42 +72,67 @@ def test_tz_neutral_dates(input_date, output_date, now_year): ), ], ) -def test_format_api_url(url_api_dict, base_url, url_without_api_key, url_with_api_key): - res1, res2, res3 = task_helpers.format_api_url_with_limit_offset(url_api_dict) +def test_format_api_url(ss_args, base_url, url_without_api_key, url_with_api_key): + storage_service = models.StorageService(**ss_args) + res1, res2, res3 = task_helpers.format_api_url_with_limit_offset(storage_service) assert res1 == base_url assert res2 == url_without_api_key assert res3 == url_with_api_key @pytest.mark.parametrize( - "api_url, package_uuid, path_to_mets, result", + "ss_args, package_uuid, path_to_mets, result", [ ( - {"baseUrl": "http://example.com", "userName": "1234", "apiKey": "1234"}, + { + "name": "Test", + "url": "http://example.com", + "user_name": "1234", + "api_key": "1234", + "download_limit": 0, + "download_offset": 0, + "default": False, + }, "1234", "1234", "http://example.com/api/v2/file/1234/extract_file/?relative_path_to_file=1234&username=1234&api_key=1234", ), ( - {"baseUrl": "http://example.com/", "userName": "1234", "apiKey": "1234"}, + { + "name": "Test", + "url": "http://example.com", + "user_name": "1234", + "api_key": "1234", + "download_limit": 0, + "download_offset": 0, + "default": False, + }, "1234", "1234", "http://example.com/api/v2/file/1234/extract_file/?relative_path_to_file=1234&username=1234&api_key=1234", ), ], ) -def test_get_mets_url(api_url, package_uuid, path_to_mets, result): - """Ensure that the URL for retrieving METS is constructed properly. - """ - mets_url = task_helpers.get_mets_url(api_url, package_uuid, path_to_mets) +def test_get_mets_url(ss_args, package_uuid, path_to_mets, result): + """Ensure that the URL for retrieving METS is constructed properly.""" + ss = models.StorageService(**ss_args) + mets_url = task_helpers.get_mets_url(ss, package_uuid, path_to_mets) assert mets_url == result @pytest.mark.parametrize( - "api_url, current_location, expected_url, expected_url_without_api_key", + "ss_args, current_location, expected_url, expected_url_without_api_key", [ ( - {"baseUrl": "http://example.com", "userName": "1234", "apiKey": "12345"}, + { + "name": "Test", + "url": "http://example.com", + "user_name": "1234", + "api_key": "12345", + "download_limit": 0, + "download_offset": 0, + "default": False, + }, "/api/v2/location/{}".format(LOCATION_UUID), "http://example.com/api/v2/location/1b60c346-85a0-4a3c-a88b-0c1b3255e2ec?username=1234&api_key=12345", "http://example.com/api/v2/location/1b60c346-85a0-4a3c-a88b-0c1b3255e2ec", @@ -111,11 +140,13 @@ def test_get_mets_url(api_url, package_uuid, path_to_mets, result): ], ) def test_get_storage_service_api_url( - api_url, current_location, expected_url, expected_url_without_api_key + ss_args, current_location, expected_url, expected_url_without_api_key ): """Ensure construction of URL to fetch Resource information.""" + storage_service = models.StorageService(**ss_args) + url, url_without_secrets = task_helpers.get_storage_service_api_url( - api_url, current_location + storage_service, current_location ) assert url == expected_url assert url_without_secrets == expected_url_without_api_key diff --git a/AIPscan/Aggregator/tests/test_tasks.py b/AIPscan/Aggregator/tests/test_tasks.py index 8d0695f5..b737aeba 100644 --- a/AIPscan/Aggregator/tests/test_tasks.py +++ b/AIPscan/Aggregator/tests/test_tasks.py @@ -15,6 +15,7 @@ delete_storage_service, get_mets, make_request, + parse_package_list_file, parse_packages_and_load_mets, ) from AIPscan.Aggregator.tests import ( @@ -56,7 +57,11 @@ def test_get_mets_task(app_instance, tmpdir, mocker, fixture_path, package_uuid) mets_file = os.path.join(FIXTURES_DIR, fixture_path) def mock_download_mets( - api_url, package_uuid, relative_path_to_mets, timestamp_str, package_list_no + storage_service, + package_uuid, + relative_path_to_mets, + timestamp_str, + package_list_no, ): return mets_file @@ -78,8 +83,6 @@ def mock_download_mets( aips = _get_aips(storage_service.id) assert not aips - api_url = {"baseUrl": "http://test-url", "userName": "test", "apiKey": "test"} - # Set up custom logger and add handler to capture output customlogger = logging.getLogger(__name__) customlogger.setLevel(logging.DEBUG) @@ -96,7 +99,6 @@ def mock_download_mets( package_uuid=package_uuid, aip_size=1000, relative_path_to_mets="test", - api_url=api_url, timestamp_str=datetime.now() .replace(microsecond=0) .strftime("%Y-%m-%d-%H-%M-%S"), @@ -121,7 +123,6 @@ def mock_download_mets( package_uuid=package_uuid, aip_size=1000, relative_path_to_mets="test", - api_url=api_url, timestamp_str=datetime.now() .replace(microsecond=0) .strftime("%Y-%m-%d-%H-%M-%S"), @@ -147,7 +148,6 @@ def mock_download_mets( package_uuid=package_uuid, aip_size=1000, relative_path_to_mets="test", - api_url=api_url, timestamp_str=datetime.now() .replace(microsecond=0) .strftime("%Y-%m-%d-%H-%M-%S"), @@ -236,6 +236,18 @@ def test_make_request(mocker, response, raises_task_error): assert return_dict["key"] == RESPONSE_DICT["key"] +def test_parse_package_list_file(tmpdir): + """Test that JSON package list files are being parsed.""" + json_file_path = tmpdir.join("packages.json") + json_file_path.write(json.dumps({"objects": []})) + + package = parse_package_list_file(json_file_path, None, True) + assert "objects" in package + + package_list = package["objects"] + assert len(package_list) == 0 + + def test_parse_packages_and_load_mets(app_instance, tmpdir, mocker): """Test that JSON package lists are deleted after being parsed.""" json_file_path = tmpdir.join("packages.json") @@ -243,7 +255,9 @@ def test_parse_packages_and_load_mets(app_instance, tmpdir, mocker): delete_package_json = mocker.patch("AIPscan.Aggregator.tasks.os.remove") - parse_packages_and_load_mets(json_file_path, {}, str(datetime.now()), 1, 1, 1) + package_list = parse_package_list_file(json_file_path, None, True) + + parse_packages_and_load_mets(package_list, str(datetime.now()), 1, 1, 1) delete_package_json.assert_called_with(json_file_path) diff --git a/AIPscan/Aggregator/tests/test_types.py b/AIPscan/Aggregator/tests/test_types.py index 0071d259..a25cf188 100644 --- a/AIPscan/Aggregator/tests/test_types.py +++ b/AIPscan/Aggregator/tests/test_types.py @@ -44,32 +44,49 @@ def test_package_ness(): """ package = types.StorageServicePackage(aip=True) assert package.is_aip() + assert package.is_undeleted_aip() assert not package.is_dip() assert not package.is_sip() assert not package.is_deleted() assert not package.is_replica() + package = types.StorageServicePackage(dip=True) assert package.is_dip() assert not package.is_aip() + assert not package.is_undeleted_aip() assert not package.is_sip() assert not package.is_deleted() assert not package.is_replica() + package = types.StorageServicePackage(sip=True) assert package.is_sip() assert not package.is_dip() assert not package.is_aip() + assert not package.is_undeleted_aip() assert not package.is_deleted() assert not package.is_replica() + package = types.StorageServicePackage(deleted=True) assert package.is_deleted() assert not package.is_replica() assert not package.is_aip() + assert not package.is_undeleted_aip() + assert not package.is_dip() + assert not package.is_aip() + + package = types.StorageServicePackage(aip=True, deleted=True) + assert package.is_deleted() + assert not package.is_replica() + assert not package.is_aip() + assert not package.is_undeleted_aip() assert not package.is_dip() assert not package.is_aip() + package = types.StorageServicePackage(aip=True, replica=True) assert package.is_replica() assert not package.is_deleted() assert not package.is_aip() + assert not package.is_undeleted_aip() assert not package.is_dip() assert not package.is_aip() diff --git a/AIPscan/Aggregator/tests/test_views.py b/AIPscan/Aggregator/tests/test_views.py index b6edfaf0..bd44e4e0 100644 --- a/AIPscan/Aggregator/tests/test_views.py +++ b/AIPscan/Aggregator/tests/test_views.py @@ -3,14 +3,7 @@ from AIPscan.Aggregator.tasks import TaskError from AIPscan.Aggregator.views import _test_storage_service_connection - -API_URL = { - "baseUrl": "http://test-url", - "userName": "test", - "apiKey": "test", - "offset": "10", - "limit": "0", -} +from AIPscan.models import StorageService def test__test_storage_service_connection(mocker): @@ -19,7 +12,10 @@ def test__test_storage_service_connection(mocker): make_request.side_effect = TaskError("Bad response from server") with pytest.raises(ConnectionError): - _test_storage_service_connection(API_URL) + storage_service = StorageService( + "Test", "http://test-url", "test", "test", "0", "10", "" + ) + _test_storage_service_connection(storage_service) def test_new_fetch_job_bad_connection(app_with_populated_files, mocker): diff --git a/AIPscan/Aggregator/types.py b/AIPscan/Aggregator/types.py index d82320e1..34286839 100644 --- a/AIPscan/Aggregator/types.py +++ b/AIPscan/Aggregator/types.py @@ -91,6 +91,12 @@ def is_aip(self): return True return False + def is_undeleted_aip(self): + if self.is_deleted(): + return False + + return self.is_aip() + def is_dip(self): """Determine whether the package is a DIP""" if ( diff --git a/AIPscan/Aggregator/views.py b/AIPscan/Aggregator/views.py index 2f10b228..60ba7770 100644 --- a/AIPscan/Aggregator/views.py +++ b/AIPscan/Aggregator/views.py @@ -38,18 +38,15 @@ def _format_date(date_string): return formatted_date.strftime(DATE_FORMAT_PARTIAL) -def _test_storage_service_connection(api_url): +def _test_storage_service_connection(storage_service): """Test Storage Service credentials. - :param api_url: Storage Service credentials (dict) + :param storage_service: StorageService instance :raises ConnectionError: if credentials are invalid """ - # Make a new dict instead of altering our actual credentials. - test_credentials = dict(api_url) - test_credentials["limit"] = 10 _, request_url_without_api_key, request_url = format_api_url_with_limit_offset( - test_credentials + storage_service ) try: _ = tasks.make_request(request_url, request_url_without_api_key) @@ -162,25 +159,14 @@ def delete_storage_service(storage_service_id): def new_fetch_job(fetch_job_id): """Fetch and process AIP METS files from Storage Service.""" storage_service = StorageService.query.get(fetch_job_id) - api_url = { - "baseUrl": storage_service.url, - "userName": storage_service.user_name, - "apiKey": storage_service.api_key, - "offset": str(storage_service.download_offset), - "limit": str(storage_service.download_limit), - } # Check Storage Service credentials and return 400 if invalid prior to # creating the Fetch Job and kicking off the Celery task. try: - _test_storage_service_connection(api_url) + _test_storage_service_connection(storage_service) except ConnectionError: return jsonify({}), 400 - # create "downloads/" directory if it doesn't exist - if not os.path.exists("AIPscan/Aggregator/downloads/"): - os.makedirs("AIPscan/Aggregator/downloads/") - # create a subdirectory for the download job using a timestamp as its name datetime_obj_start = datetime.now().replace(microsecond=0) timestamp_str = datetime_obj_start.strftime("%Y-%m-%d-%H-%M-%S") @@ -197,7 +183,7 @@ def new_fetch_job(fetch_job_id): # send the METS fetch job to a background job that will coordinate other workers task = tasks.workflow_coordinator.delay( - api_url, timestamp_str, storage_service.id, fetch_job.id, packages_directory + timestamp_str, storage_service.id, fetch_job.id, packages_directory ) """ diff --git a/AIPscan/Reporter/views.py b/AIPscan/Reporter/views.py index 8f9321fa..10b1816c 100644 --- a/AIPscan/Reporter/views.py +++ b/AIPscan/Reporter/views.py @@ -329,15 +329,9 @@ def download_mets(aip_id): aip = AIP.query.get(aip_id) storage_service = StorageService.query.get(aip.storage_service_id) - api_url = { - "baseUrl": storage_service.url, - "userName": storage_service.user_name, - "apiKey": storage_service.api_key, - } - mets_response = requests.get( get_mets_url( - api_url, + storage_service, aip.uuid, f"{aip.transfer_name}-{aip.uuid}/data/METS.{aip.uuid}.xml", ) diff --git a/tools/fetch_aips b/tools/fetch_aips index 002106b8..1213b23a 100755 --- a/tools/fetch_aips +++ b/tools/fetch_aips @@ -50,7 +50,6 @@ def main(ss_id, session_id, page, packages_per_page, logfile): package_list_no = "batch" with app.app_context(): - # Get storage service configuration storage_service = StorageService.query.get(ss_id) @@ -77,19 +76,11 @@ def main(ss_id, session_id, page, packages_per_page, logfile): # Get package data packages = fetch.get_packages(storage_service, packages_dir) - # Determine start and end package + # Determine total packages and start and end package total_packages = len(packages["objects"]) - - if page is None: - start_item = 1 - end_item = total_packages - else: - start_item = ((page - 1) * packages_per_page) + 1 - end_item = start_item + packages_per_page - 1 - - # Describe start and end package - if total_packages < end_item: - end_item = total_packages + start_item, end_item = fetch.determine_start_and_end_item( + page, packages_per_page, total_packages + ) # Make sure paging window is valid and delete fetch job if not if start_item > total_packages: @@ -104,13 +95,10 @@ def main(ss_id, session_id, page, packages_per_page, logfile): ) # Import packages - api_url = fetch.assemble_api_url_dict(storage_service) - processed_packages = fetch.import_packages( packages, start_item, end_item, - api_url, ss_id, session_id, package_list_no, diff --git a/tools/helpers/fetch.py b/tools/helpers/fetch.py index e6763ad7..ff4e1732 100644 --- a/tools/helpers/fetch.py +++ b/tools/helpers/fetch.py @@ -1,34 +1,37 @@ import json import pathlib -from AIPscan.Aggregator import database_helpers from AIPscan.Aggregator.task_helpers import ( format_api_url_with_limit_offset, get_packages_directory, + parse_package_list_file, process_package_object, ) -from AIPscan.Aggregator.tasks import delete_aip, get_mets, make_request +from AIPscan.Aggregator.tasks import handle_deletion, make_request, start_mets_task -def assemble_api_url_dict(storage_service, offset=0, limit=1_000_000): - return { - "baseUrl": storage_service.url, - "userName": storage_service.user_name, - "apiKey": storage_service.api_key, - "offset": offset, - "limit": limit, - } +def determine_start_and_end_item(page, packages_per_page, total_packages): + if page is None: + start_item = 1 + end_item = total_packages + else: + start_item = ((page - 1) * packages_per_page) + 1 + end_item = start_item + packages_per_page - 1 + + # Describe start and end package + if total_packages < end_item: + end_item = total_packages + return start_item, end_item -def fetch_and_write_packages(storage_service, package_filename): - api_url = assemble_api_url_dict(storage_service) +def fetch_and_write_packages(storage_service, package_filepath): (_, request_url_without_api_key, request_url) = format_api_url_with_limit_offset( - api_url + storage_service ) packages = make_request(request_url, request_url_without_api_key) - with open(package_filename, "w", encoding="utf-8") as f: + with open(package_filepath, "w", encoding="utf-8") as f: json.dump(packages, f) return packages @@ -50,13 +53,12 @@ def create_mets_directory(timestamp_str): def get_packages(storage_service, packages_dir): - package_filename = pathlib.Path(packages_dir) / "packages.json" + package_filepath = pathlib.Path(packages_dir) / "packages.json" - if pathlib.Path(package_filename).is_file(): - with open(package_filename) as f: - packages = json.load(f) + if pathlib.Path(package_filepath).is_file(): + packages = parse_package_list_file(package_filepath, None, False) else: - packages = fetch_and_write_packages(storage_service, package_filename) + packages = fetch_and_write_packages(storage_service, package_filepath) return packages @@ -65,7 +67,6 @@ def import_packages( packages, start_item, end_item, - api_url, storage_service_id, timestamp_str, package_list_no, @@ -87,35 +88,22 @@ def import_packages( logger.info(f"Processing {package.uuid} ({current_item} of {end_item})") processed_packages.append(package) + handle_deletion(package) - if package.is_deleted(): - delete_aip(package.uuid) + if not package.is_undeleted_aip(): continue - if not package.is_aip(): - continue - - storage_location = database_helpers.create_or_update_storage_location( - package.current_location, api_url, storage_service_id - ) - - pipeline = database_helpers.create_or_update_pipeline( - package.origin_pipeline, api_url - ) - - args = [ + start_mets_task( package.uuid, package.size, package.get_relative_path(), - api_url, + package.current_location, + package.origin_pipeline, timestamp_str, package_list_no, storage_service_id, - storage_location.id, - pipeline.id, fetch_job_id, - logger, - ] - get_mets.apply(args=args) + True, + ) return processed_packages