From 1c650b27103e434a895d2ac69f513a39bf0c12f3 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Tue, 17 Dec 2024 10:40:59 -0500 Subject: [PATCH] Add Corruption Fixer (#124) * First sketches of whole system * Error handling * Added full initial version of background task * Fixes to rationalize database schema * Added integratoin test for corruption endpoints * Add info on rollback * Respect soft timeout * Add corruption fixer settings and docs --- ...c988372_add_librarian_transfer_toggling.py | 10 + docs/source/Background.rst | 43 +++ hera_librarian/models/corrupt.py | 24 ++ librarian_background/__init__.py | 1 + librarian_background/corruption_fixer.py | 284 ++++++++++++++++++ librarian_background/recieve_clone.py | 15 +- librarian_background/send_clone.py | 159 +++++----- librarian_background/settings.py | 16 + librarian_server/__init__.py | 2 + librarian_server/api/__init__.py | 1 + librarian_server/api/corrupt.py | 259 ++++++++++++++++ librarian_server/orm/file.py | 26 +- librarian_server/orm/instance.py | 4 +- tests/conftest.py | 39 ++- .../test_corruption_endpoints.py | 213 +++++++++++++ tests/integration_test/test_send_queue.py | 37 ++- tests/server.py | 26 +- 17 files changed, 1054 insertions(+), 105 deletions(-) create mode 100644 hera_librarian/models/corrupt.py create mode 100644 librarian_background/corruption_fixer.py create mode 100644 librarian_server/api/corrupt.py create mode 100644 tests/integration_test/test_corruption_endpoints.py diff --git a/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py b/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py index 74e1a6c..3b54f06 100644 --- a/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py +++ b/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py @@ -29,15 +29,25 @@ def upgrade(): "corrupt_files", sa.Column("id", sa.Integer(), nullable=False), sa.Column("file_name", sa.String(), nullable=False), + sa.Column("file_source", sa.String(), nullable=False), sa.Column("instance_id", sa.Integer(), nullable=False), + sa.Column("instance_path", sa.String(), nullable=False), sa.Column("corrupt_time", sa.DateTime(), nullable=False), sa.Column("size", sa.BigInteger(), nullable=False), sa.Column("checksum", sa.String(), nullable=False), sa.Column("count", sa.Integer(), nullable=False), + sa.Column("replacement_requested", sa.Boolean(), nullable=False), + sa.Column("incoming_transfer_id", sa.Integer(), nullable=True), sa.PrimaryKeyConstraint("id"), ) + with op.batch_alter_table("outgoing_transfers") as batch_op: + batch_op.alter_column("file_name", nullable=True) + def downgrade(): op.drop_column("librarians", "transfers_enabled") op.drop_table("corrupt_files") + + with op.batch_alter_table("outgoing_transfers") as batch_op: + batch_op.alter_column("file_name", nullable=False) diff --git a/docs/source/Background.rst b/docs/source/Background.rst index e476299..6878179 100644 --- a/docs/source/Background.rst +++ b/docs/source/Background.rst @@ -122,6 +122,26 @@ The following background tasks are available: This task is configured with the following additional parameters: * ``age_in_days``: The number of days back to check for files to transfer (integer). +- ``duplicate_remote_instance_hypervisor``: A hypervisor that looks for duplicate remote + instance rows int he table and removes one. This ensures database integrity and that + the count of remote instances per librarian and store corresponds to the number of files + on that librarian. +- ``rolling_deletion``: A task to delete data that was ingested into the librarian + more than ``age_in_days`` ago. Note that this does not delete them from the entire network, + and has specific tools to ensure other copies exist in the network elsewhere: + + * ``store_name``: The store to delete instances from + * ``age_in_days``: The number of days old data needs to be to be considered for deletion + * ``number_of_remote_copies``: The number of copies in the rest of the network (which are + validated using checksumming) that must be kept before deleting a local instance. + * ``verifiy_downstream_checksums``: Whether to make sure all downstream checksums that were + computed on request match the underlying data before deletion (True). + * ``mark_unavailable``: Whether to mark instances as unavailable (True) or actually remove the + rows in the table (False). Default True. + * ``force_deletion``: Whether to ignore the legacy DeletionPolicy parameter (True). +- ``corruption_fixer``: A task that reaches out to upstream librarians to ask for new copies of + corrupt files in the table. These corrupt files can be found by the ``check_integrity`` task + or when upstreams validate files during the deletion process. Background Task Configuration Examples @@ -198,6 +218,22 @@ store. The destination librarian is called ``destination``. "every": "01:00:00", "age_in_days": 2 } + ], + "duplicate_remote_instance_hypervisor": [ + { + "task_name": "Duplicate RI hypervisor", + "soft_timeout": "00:30:00", + "every": "24:00:00" + } + ], + "rolling_deletion": [ + { + "task_name": "Storage Recovery", + "soft_timeout": "00:30:00", + "every": "24:00:00", + "store_name": "store", + "number_of_remote_copies": 2 + } ] } @@ -229,5 +265,12 @@ deleted from the store. "every": "01:00:00", "age_in_days": 2 } + ], + "corruption_fixer": [ + { + "task_name": "Corruption fixer", + "soft_timeout": "00:30:00", + "every": "24:00:00" + } ] } diff --git a/hera_librarian/models/corrupt.py b/hera_librarian/models/corrupt.py new file mode 100644 index 0000000..45e598e --- /dev/null +++ b/hera_librarian/models/corrupt.py @@ -0,0 +1,24 @@ +""" +Models for the corruption fixing endpoints. +""" + +from pydantic import BaseModel + + +class CorruptionPreparationRequest(BaseModel): + file_name: str + librarian_name: str + + +class CorruptionPreparationResponse(BaseModel): + ready: bool + + +class CorruptionResendRequest(BaseModel): + librarian_name: str + file_name: str + + +class CorruptionResendResponse(BaseModel): + success: bool + destination_transfer_id: int diff --git a/librarian_background/__init__.py b/librarian_background/__init__.py index 13ad94e..7c29d68 100644 --- a/librarian_background/__init__.py +++ b/librarian_background/__init__.py @@ -29,6 +29,7 @@ def background(run_once: bool = False): + background_settings.incoming_transfer_hypervisor + background_settings.duplicate_remote_instance_hypervisor + background_settings.rolling_deletion + + background_settings.corruption_fixer ) for task in all_tasks: diff --git a/librarian_background/corruption_fixer.py b/librarian_background/corruption_fixer.py new file mode 100644 index 0000000..148af20 --- /dev/null +++ b/librarian_background/corruption_fixer.py @@ -0,0 +1,284 @@ +""" +A background task that queries the corrupt files table and remedies them. +""" + +from datetime import datetime, timedelta, timezone +from time import perf_counter + +from loguru import logger +from sqlalchemy import select +from sqlalchemy.orm import Session + +from hera_librarian.exceptions import LibrarianError, LibrarianHTTPError +from hera_librarian.models.corrupt import ( + CorruptionPreparationRequest, + CorruptionPreparationResponse, + CorruptionResendRequest, + CorruptionResendResponse, +) +from hera_librarian.transfer import TransferStatus +from hera_librarian.utils import compare_checksums, get_hash_function_from_hash +from librarian_server.database import get_session +from librarian_server.orm.file import CorruptFile, File +from librarian_server.orm.instance import Instance +from librarian_server.orm.librarian import Librarian +from librarian_server.orm.transfer import IncomingTransfer + +from .task import Task + + +class CorruptionFixer(Task): + """ + Checks in on corrupt files in the corrupt files table and remedies them. + """ + + def on_call(self): + with get_session() as session: + return self.core(session=session) + + def core(self, session: Session) -> bool: + start_time = datetime.now(timezone.utc) + end_time = start_time + self.soft_timeout + + query_start = perf_counter() + + stmt = ( + select(CorruptFile) + .filter(CorruptFile.replacement_requested != True) + .with_for_update(skip_locked=True) + ) + + results = session.execute(stmt).scalars().all() + + query_end = perf_counter() + + logger.info( + "Took {} s to query for {} corrupt files", + query_end - query_start, + len(results), + ) + + for corrupt in results: + if datetime.now(timezone.utc) > end_time: + logger.warning( + "Soft timeout reached for CorruptionFixer; stopping at {time}", + time=datetime.now(timezone.utc), + ) + return False + + logger.info( + "Attempting to fix {id} ({name})", id=corrupt.id, name=corrupt.file_name + ) + + # First: query the file table to see if we still have the file. We do not store + # a foreign key in the corrupt table because we may have deleted the file and + # failed to contact the upstream. + stmt = select(File).filter_by(name=corrupt.file_name) + potential_file = session.execute(stmt).scalar_one_or_none() + + stmt = select(Instance).filter_by(id=corrupt.instance_id) + potential_instance = session.execute(stmt).scalar_one_or_none() + + # Step A: Check that the file is actually corrupt + try: + hash_function = get_hash_function_from_hash(potential_file.checksum) + store = potential_instance.store + path_info = store.store_manager.path_info( + potential_instance.path, hash_function=hash_function + ) + + if compare_checksums(potential_file.checksum, path_info.checksum): + logger.info( + "CorruptFile {id} stated that file {name} was corrupt in instance {inst_id} " + "but we just checked the checksums: {chk_a}=={chk_b} and the file is fine " + "or was fixed behind our back; removing CorruptFile row", + id=corrupt.id, + name=corrupt.file_name, + inst_id=corrupt.instance_id, + chk_a=potential_file.checksum, + chk_b=path_info.checksum, + ) + session.delete(corrupt) + session.commit() + continue + + # Remedy A: We have another local copy of the file! + # TODO: Implement this; it is not relevant for SO. + if len(potential_file.instances) > 1: + # Uhhh there is more than one instance here, we don't know what to do. + logger.error( + "File {name} has a corrupt instance {id} but there is {n} > 1 " + "instances of the file on this librarian; entered block that was " + "never completed and need manual remedy", + name=corrupt.file_name, + id=corrupt.instance_id, + n=len(potential_file.instances), + ) + continue + except (FileNotFoundError, AttributeError): + logger.error( + "Instance {} is missing, but we will continue with recovery (File: {})", + corrupt.instance_id, + corrupt.file_name, + ) + + # Ok, so the file _really is corrupt_ or it is missing and we only have one instance + + # Remedy B: the origin of this file is another librarian. Ask for a new copy. + stmt = select(Librarian).filter_by(name=corrupt.file_source) + result: Librarian | None = session.execute(stmt).scalar_one_or_none() + + if result is None: + logger.error( + "File {name} has one and only one corrupt instance {id} but there is no " + "valid librarian matching {lib} in the librarians table so cannot " + "request a new valid copy of the file", + name=corrupt.file_name, + id=corrupt.instance_id, + lib=corrupt.file_source, + ) + continue + + # Use the librarian to ask for a new copy. + client = result.client() + + try: + client.ping() + except (LibrarianError, LibrarianHTTPError): + logger.error( + "Librarian {lib} is unreachable at the moment, cannot restore file {name}", + lib=result.name, + name=corrupt.file_name, + ) + continue + + prepare_request = CorruptionPreparationRequest( + file_name=corrupt.file_name, librarian_name=result.name + ) + + try: + prepare_response: CorruptionPreparationResponse = client.post( + endpoint="corrupt/prepare", + request=prepare_request, + response=CorruptionPreparationResponse, + ) + + if not prepare_response.ready: + raise ValueError("Preparation endpoint returned False") + except (LibrarianError, LibrarianHTTPError, ValueError) as e: + logger.error( + "Librarian {lib} contact during preparation for corruption fix to restore " + "{name} did not succeed: {e}", + lib=result.name, + name=corrupt.file_name, + e=e, + ) + continue + + # This also deletes remote instances which will need to be repaired. However + # it is unlikely that we will be in that situation. Unfortunately we _must_ commit + # this as the files table must be accessed from a different table. + corrupt.file.delete(session=session, commit=True, force=True) + + resend_request = CorruptionResendRequest( + file_name=corrupt.file_name, + librarian_name=result.name, + ) + + try: + resend_response: CorruptionResendResponse = client.post( + "corrupt/resend", + request=resend_request, + response=CorruptionResendResponse, + ) + + if not resend_response.success: + raise ValueError("Failure during resend") + except (LibrarianError, LibrarianHTTPError): + logger.error( + "Failed during the resend request flow for librarian {lib}, " + "corrupt {id} for file {name} with {e}; we have deleted data and rows", + lib=result.name, + id=corrupt.id, + name=corrupt.file_name, + e=e, + ) + # Can't rollback anything here so there's no point + continue + + corrupt.incoming_transfer_id = resend_response.destination_transfer_id + corrupt.replacement_requested = True + session.commit() + + # Now check in on files that we already requested new copies of. + query_start = perf_counter() + + stmt = ( + select(CorruptFile) + .filter(CorruptFile.replacement_requested == True) + .with_for_update(skip_locked=True) + ) + + results = session.execute(stmt).scalars().all() + + query_end = perf_counter() + + logger.info( + "Took {} s to query for {} corrupt files already in progress", + query_end - query_start, + len(results), + ) + + for result in results: + stmt = select(IncomingTransfer).filter_by(id=result.incoming_transfer_id) + transfer = session.execute(stmt).scalar_one_or_none() + + file_is_fixed = False + + if transfer.status in [TransferStatus.FAILED, TransferStatus.CANCELLED]: + logger.warning( + "Transfer for corrupt file {id} ({name}) is in status {status}", + id=result.id, + name=result.file_name, + status=transfer.status, + ) + # That's no good. We should check to see if we got the file anyway: + stmt = select(File).filter_by(name=result.file_name) + file = session.execute(stmt).scalar_one_or_none() + + if file is not None: + # Oh, we're good. Phew, we successfully ingested it. + logger.info( + "Though transfer is in status {status}, file {name} was successfully " + "ingested anyway", + status=transfer.status, + name=result.file_name, + ) + file_is_fixed = True + else: + # We actually need to re-download it. + logger.warning( + "Re-setting corrupt file {id} ({name}) to not having a replacement requested " + "as the transfer failed. It will be re-downloaded at the next invocation ", + id=result.id, + name=result.file_name, + ) + result.replacement_requested = False + elif transfer.status in [TransferStatus.COMPLETED]: + # That's good, we got the file! + file_is_fixed = True + else: + file_is_fixed = False + + if file_is_fixed: + logger.info( + "Confirmed that corrupt file {id} ({name}) has been replaced with a new copy; " + "deleting the CorruptFile row", + id=result.id, + name=result.file_name, + ) + session.delete(result) + + session.commit() + + return diff --git a/librarian_background/recieve_clone.py b/librarian_background/recieve_clone.py index ac3967a..bae22aa 100644 --- a/librarian_background/recieve_clone.py +++ b/librarian_background/recieve_clone.py @@ -5,12 +5,10 @@ """ import datetime -import logging import time -import traceback -from pathlib import Path -from typing import TYPE_CHECKING, Optional +from typing import Optional +import cryptography from loguru import logger from sqlalchemy.orm import Session @@ -19,9 +17,7 @@ from hera_librarian.models.clone import CloneCompleteRequest, CloneCompleteResponse from librarian_server.database import get_session from librarian_server.orm import ( - File, IncomingTransfer, - Instance, Librarian, StoreMetadata, TransferStatus, @@ -157,16 +153,17 @@ def core(self, session: Session): logger.debug(f"Request to send: {request}") - downstream_client = librarian.client() - try: + downstream_client = librarian.client() + logger.info("Sending clone complete request") response: CloneCompleteResponse = downstream_client.post( endpoint="clone/complete", request=request, response=CloneCompleteResponse, ) - except LibrarianHTTPError as e: + except (LibrarianHTTPError, cryptography.fernet.InvalidToken) as e: + # Cryptography error sometimes happens when we mix up configs in testing logger.error( "Failed to call back to librarian {name} with exception {e}", name=librarian.name, diff --git a/librarian_background/send_clone.py b/librarian_background/send_clone.py index 08ecbf6..2fdf5d0 100644 --- a/librarian_background/send_clone.py +++ b/librarian_background/send_clone.py @@ -449,6 +449,92 @@ def handle_existing_file( ) +def send_file_batch( + files: list[File], + librarian: Librarian, + session: Session, + store_preference: str | None = None, +): + client = librarian.client() + + outgoing_transfers, outgoing_information = process_batch( + files=files, + destination=librarian.name, + store_preference=store_preference, + ) + + session.add_all(outgoing_transfers) + session.commit() + + response = use_batch_to_call_librarian( + outgoing_transfers=outgoing_transfers, + outgoing_information=outgoing_information, + client=client, + librarian=librarian, + session=session, + ) + + # We were unable to speak to the librarian, and have had our + # transfers cancelled for us. Time to move on to the next + # batch and hope for the best. + + # Tested outside of the main loop. + if not response: # pragma: no cover + return False + + # Ok, they got out stuff. Need to do two things now: + # - Create the queue send item + # - Update the transfers with their information. + + send, transfer_provider, transfer_map = create_send_queue_item( + response=response, + outgoing_transfers=outgoing_transfers, + librarian=librarian, + session=session, + ) + + # Send is falsey if there was a problem in creating the send + # queue item. In that, case we've failed everything, and should break + # and come back later. + + # Tested outside of the main loop. + if not send: # pragma: no cover + return False + + # Now update the outgoing transfers with their information. + for transfer in outgoing_transfers: + remote_transfer_info: CloneBatchInitiationRequestFileItem = transfer_map.get( + transfer.id, None + ) + + if remote_transfer_info is None: # pragma: no cover + # This is an unreachable state; we already purged these + # scenarios. + logger.error( + "Trying to set parameters of a transfer that should not exist; " + "this should be an unreachable state." + ) + # In this case, the best thing that we can do is fail this individual + # transfer and pick it up later. + transfer.fail_transfer(session=session, commit=False) + + transfer.remote_transfer_id = remote_transfer_info.destination_transfer_id + transfer.transfer_data = transfer_provider + transfer.send_queue = send + transfer.send_queue_id = send.id + transfer.source_path = str(transfer.instance.path) + transfer.dest_path = str(remote_transfer_info.staging_location) + + session.commit() + + # Finally, call up the destination again and tell them everything is on its + # way. + + call_destination_and_state_ongoing(send=send, session=session) + + return list(transfer_map.values()) + + class SendClone(Task): """ Launches clones of files to a remote librarian. @@ -592,81 +678,14 @@ def core(self, session: Session): files_tried += this_batch_size - outgoing_transfers, outgoing_information = process_batch( + success = send_file_batch( files=files_to_try, - destination=self.destination_librarian, - store_preference=self.store_preference, - ) - - session.add_all(outgoing_transfers) - session.commit() - - response = use_batch_to_call_librarian( - outgoing_transfers=outgoing_transfers, - outgoing_information=outgoing_information, - client=client, - librarian=librarian, - session=session, - ) - - # We were unable to speak to the librarian, and have had our - # transfers cancelled for us. Time to move on to the next - # batch and hope for the best. - - # Tested outside of the main loop. - if not response: # pragma: no cover - continue - - # Ok, they got out stuff. Need to do two things now: - # - Create the queue send item - # - Update the transfers with their information. - - send, transfer_provider, transfer_map = create_send_queue_item( - response=response, - outgoing_transfers=outgoing_transfers, librarian=librarian, session=session, + store_preference=self.store_preference, ) - # Send is falsey if there was a problem in creating the send - # queue item. In that, case we've failed everything, and should break - # and come back later. - - # Tested outside of the main loop. - if not send: # pragma: no cover + if not success: break - # Now update the outgoing transfers with their information. - for transfer in outgoing_transfers: - remote_transfer_info: CloneBatchInitiationRequestFileItem = ( - transfer_map.get(transfer.id, None) - ) - - if remote_transfer_info is None: # pragma: no cover - # This is an unreachable state; we already purged these - # scenarios. - logger.error( - "Trying to set parameters of a transfer that should not exist; " - "this should be an unreachable state." - ) - # In this case, the best thing that we can do is fail this individual - # transfer and pick it up later. - transfer.fail_transfer(session=session, commit=False) - - transfer.remote_transfer_id = ( - remote_transfer_info.destination_transfer_id - ) - transfer.transfer_data = transfer_provider - transfer.send_queue = send - transfer.send_queue_id = send.id - transfer.source_path = str(transfer.instance.path) - transfer.dest_path = str(remote_transfer_info.staging_location) - - session.commit() - - # Finally, call up the destination again and tell them everything is on its - # way. - - call_destination_and_state_ongoing(send=send, session=session) - return diff --git a/librarian_background/settings.py b/librarian_background/settings.py index 3ee9e10..7a3b782 100644 --- a/librarian_background/settings.py +++ b/librarian_background/settings.py @@ -20,6 +20,7 @@ from librarian_background.rolling_deletion import RollingDeletion from .check_integrity import CheckIntegrity +from .corruption_fixer import CorruptionFixer from .create_clone import CreateLocalClone from .queues import CheckConsumedQueue, ConsumeQueue, TransferStatus from .recieve_clone import RecieveClone @@ -255,6 +256,19 @@ def task(self) -> RollingDeletion: ) +class CorruptionFixerSettings(BackgroundTaskSettings): + """ + Settings for the corruption fixer task. + """ + + @property + def task(self) -> CorruptionFixer: + return CorruptionFixer( + name=self.task_name, + soft_timeout=self.soft_timeout, + ) + + class BackgroundSettings(BaseSettings): """ Background task settings, configurable. @@ -288,6 +302,8 @@ class BackgroundSettings(BaseSettings): rolling_deletion: list[RollingDeletionSettings] = [] + corruption_fixer: list[CorruptionFixerSettings] = [] + # Global settings: max_rsync_retries: int = 8 diff --git a/librarian_server/__init__.py b/librarian_server/__init__.py index d6bad1b..67686fa 100644 --- a/librarian_server/__init__.py +++ b/librarian_server/__init__.py @@ -44,6 +44,7 @@ def main() -> FastAPI: admin_router, checkin_router, clone_router, + corrupt_router, error_router, ping_router, search_router, @@ -61,5 +62,6 @@ def main() -> FastAPI: app.include_router(admin_router) app.include_router(checkin_router) app.include_router(validate_router) + app.include_router(corrupt_router) return app diff --git a/librarian_server/api/__init__.py b/librarian_server/api/__init__.py index 36a9d68..e7f50b1 100644 --- a/librarian_server/api/__init__.py +++ b/librarian_server/api/__init__.py @@ -8,6 +8,7 @@ from .admin import router as admin_router from .checkin import router as checkin_router from .clone import router as clone_router +from .corrupt import router as corrupt_router from .errors import router as error_router from .ping import router as ping_router from .search import router as search_router diff --git a/librarian_server/api/corrupt.py b/librarian_server/api/corrupt.py new file mode 100644 index 0000000..46dd188 --- /dev/null +++ b/librarian_server/api/corrupt.py @@ -0,0 +1,259 @@ +""" +API Endpoints for the upstream half of the corrupt files workflow. +""" + +from fastapi import APIRouter, Depends, HTTPException, status +from sqlalchemy import select +from sqlalchemy.orm import Session + +from hera_librarian.exceptions import LibrarianError, LibrarianHTTPError +from hera_librarian.models.corrupt import ( + CorruptionPreparationRequest, + CorruptionPreparationResponse, + CorruptionResendRequest, + CorruptionResendResponse, +) +from hera_librarian.utils import compare_checksums, get_hash_function_from_hash +from librarian_server.orm.file import File +from librarian_server.orm.instance import Instance, RemoteInstance +from librarian_server.orm.librarian import Librarian + +router = APIRouter(prefix="/api/v2/corrupt") + +from loguru import logger + +from ..database import yield_session +from .auth import CallbackUserDependency, User + + +def user_and_librarian_validation_flow( + user: User, librarian_name: str, file_name: str, session: Session +) -> tuple[Librarian, File, Instance, list[RemoteInstance]]: + """ + Figure out if this user is a librarian and that we can make file transfers + to that librarian for this file. Also validates the file on our librarian to make + sure it is not corrupt and is present. + """ + user_is_librarian = user.username == librarian_name + + stmt = select(Librarian).filter_by(name=librarian_name) + librarian = session.execute(stmt).scalars().one_or_none() + + librarian_exists = librarian is not None + + stmt = select(RemoteInstance).filter_by( + file_name=file_name, librarian_id=librarian.id + ) + remote_instances = session.execute(stmt).scalars().all() + + remote_instance_registered_at_destination = bool(remote_instances) + + if not ( + remote_instance_registered_at_destination + and user_is_librarian + and librarian_exists + ): + logger.debug( + "Problem authenticating remedy request, Remote instance: {}, User is librarian: {}, Librarian exists: {}", + remote_instance_registered_at_destination, + user_is_librarian, + librarian_exists, + ) + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=dict( + reason="Unauthorized", + suggested_remedy="", + ), + ) + + # So at this point we know: + # Downstream is the one asking for the new copy + # We sent them a copy that we confirmed + + # Check our own instance of the file to make sure it's not corrupted. + stmt = select(File).filter_by(name=file_name) + file = session.execute(stmt).scalars().one_or_none() + + try: + best_instance = [x for x in file.instances if x.available][0] + except IndexError: + raise HTTPException( + status_code=status.HTTP_409_BAD_REQUEST, + detail=dict( + reason="We do not have a copy of the file you are requesting", + suggested_remedy="Check your database; you likely did not get the file from us", + ), + ) + + hash_function = get_hash_function_from_hash(file.checksum) + path_info = best_instance.store.store_manager.path_info( + best_instance.path, hash_function=hash_function + ) + + if not compare_checksums(file.checksum, path_info.checksum): + logger.error( + "Our copy of the file {} is corrupt, we cannot send it to {}", + file_name, + librarian_name, + ) + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=dict( + reason="Our copy of the file is also corrupt", + suggested_remedy="Wait a while, we will attempt to fix this copy", + ), + ) + # Brother not this shit again + # Add to corrupt files table? + # Extremely unlikely + + # We know we have a valid copy of the file ready to go. + + # Do we have login details for your librarian? + login_success = True + try: + librarian.client().ping(require_login=True) + except (LibrarianError, LibrarianHTTPError): + login_success = False + + from librarian_background import background_settings + + if not ( + background_settings.consume_queue + and background_settings.check_consumed_queue + and librarian.transfers_enabled + and login_success + ): + logger.warning( + "Unable to transfer files to downstream librarian {}: " + "Consume queue: {}, check consume queue: {}, transfers enabled: {}, login success: {}", + librarian.name, + bool(background_settings.consume_queue), + bool(background_settings.check_consumed_queue), + librarian.transfers_enabled, + login_success, + ) + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=dict( + reason="We are not able to send you files", + suggested_remedy="Check every pre-condition for file transfers is met", + ), + ) + + return librarian, file, best_instance, remote_instances + + +@router.post("/prepare") +def prepare( + request: CorruptionPreparationRequest, + user: CallbackUserDependency, + session: Session = Depends(yield_session), +) -> CorruptionPreparationResponse: + """ + Prepare for a request to re-instate a downstream file. This checks: + + a) We can contact the downstream + b) We have a valid copy of the file + c) We have a send queue background task that will actually send the file. + + Possible response codes: + + 409 - We do not have a valid copy of the file either! + -> You are out of luck. Maybe try again later as we might restore from + a librarian above us in the chain? + 401 - You are asking about a file that was not sent to your librarian + -> Leave me alone! + 200 - Ready to send + -> Success! + """ + + logger.info( + "Recieved corruption remedy request for {} from {}", + request.file_name, + user.username, + ) + + user_and_librarian_validation_flow( + user, + librarian_name=request.librarian_name, + file_name=request.file_name, + session=session, + ) + + logger.info( + "Prepared to send a new copy of {} to {}", + request.file_name, + request.librarian_name, + ) + + return CorruptionPreparationResponse(ready=True) + + +@router.post("/resend") +def resend( + request: CorruptionResendRequest, + user: CallbackUserDependency, + session: Session = Depends(yield_session), +) -> CorruptionResendResponse: + """ + Actually send a new copy of a file that we know you already have! We assume that + you deleted it before you called this endpoint, and that you called the prepare + endpoint to make sure we're all good to go first. We will: + + a) Delete our RemoteInstance(s) for this file on your librarian + b) Create an OutgoingTransfer and SendQueue + + This transfer will then take place asynchronously through your usual mechanisms. + You _must_ have a recieve clone task running on your librarian otherwise you won't + have the new file ingested. + + Possible response codes: + + 409 - We don't have a valid copy of the file. + 201 - We created the transfer + -> Success! + """ + + logger.info( + "Recieved corruption resend request for {} from {}", + request.file_name, + user.username, + ) + + librarian, file, instance, remote_instances = user_and_librarian_validation_flow( + user, + librarian_name=request.librarian_name, + file_name=request.file_name, + session=session, + ) + + from librarian_background.send_clone import send_file_batch + + success = send_file_batch(files=[file], librarian=librarian, session=session) + + if success: + logger.info( + "Successfully created send queue item to remedy corrupt data in {}", + request.file_name, + ) + for ri in remote_instances: + session.delete(ri) + session.commit() + return CorruptionResendResponse( + success=bool(success), + destination_transfer_id=success[0].destination_transfer_id, + ) + else: + logger.info( + "Error creating send queue item to remedy corrupt data in {}", + request.file_name, + ) + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=dict( + reason="Error creating send queue item", + suggested_remedy="Check the logs for more information", + ), + ) diff --git a/librarian_server/orm/file.py b/librarian_server/orm/file.py index a7d04e5..9e8a93c 100644 --- a/librarian_server/orm/file.py +++ b/librarian_server/orm/file.py @@ -150,7 +150,10 @@ def delete( class CorruptFile(db.Base): """ An ORM object for a file that has been marked as (potentially) corrupt - during a check. This will need to be verified and fixed. + during a check. This will need to be verified and fixed. We do not store + references to the files and instances table here because those may be + deleted as part of the recovery process. As such, we need to store + copies of that data so that we can ask upstream for file recovery. """ __tablename__ = "corrupt_files" @@ -158,16 +161,14 @@ class CorruptFile(db.Base): id: int = db.Column(db.Integer, primary_key=True) "The ID of the corrupt file." file_name: str = db.Column( - db.String(256), db.ForeignKey("files.name"), nullable=False + db.String(256), ) "The name of the file." - file = db.relationship("File", primaryjoin="CorruptFile.file_name == File.name") - "The file object associated with this." - instance_id: int = db.Column(db.Integer, db.ForeignKey("instances.id")) + file_source: str = db.Column(db.String(256)) + "The source of the file." + instance_id: int = db.Column(db.Integer) "The instance ID of the corrupt file." - instance = db.relationship( - "Instance", primaryjoin="CorruptFile.instance_id == Instance.id" - ) + instance_path: str = db.Column(db.String(256)) "The instance object associated with this." corrupt_time: datetime = db.Column(db.DateTime) "The time at which the file was marked as corrupt." @@ -178,6 +179,11 @@ class CorruptFile(db.Base): count: int = db.Column(db.Integer) "The number of times this file has been marked as corrupt." + replacement_requested: bool = db.Column(db.Boolean, default=False) + "Whether or not a replacement has been requested for this file." + incoming_transfer_id: int = db.Column(db.Integer) + "The incoming transfer associated with the replacement" + @classmethod def new_corrupt_file( cls, instance: Instance, size: int, checksum: str @@ -202,9 +208,9 @@ def new_corrupt_file( return CorruptFile( file_name=instance.file.name, - file=instance.file, + file_source=instance.file.source, instance_id=instance.id, - instance=instance, + instance_path=instance.path, corrupt_time=datetime.now(timezone.utc), size=size, checksum=checksum, diff --git a/librarian_server/orm/instance.py b/librarian_server/orm/instance.py index 8ee0240..18638b5 100644 --- a/librarian_server/orm/instance.py +++ b/librarian_server/orm/instance.py @@ -135,8 +135,8 @@ class RemoteInstance(db.Base): id = db.Column(db.Integer, primary_key=True, autoincrement=True, unique=True) "The unique ID of this instance." - file_name = db.Column(db.String(256), db.ForeignKey("files.name"), nullable=False) - "Name of the file this instance references." + file_name = db.Column(db.String(256), db.ForeignKey("files.name"), nullable=True) + "Name of the file this instance references; note this is NOT a foreign key" file = db.relationship( "File", back_populates="remote_instances", diff --git a/tests/conftest.py b/tests/conftest.py index c4377a9..fc99e06 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -91,12 +91,7 @@ def test_server(tmp_path_factory): os.environ[env_var] = env_vars[env_var] -@pytest.fixture(scope="package") -def test_client(test_server): - """ - Returns a test client for the server. - """ - +def create_test_client(test_server, username="admin", password="password"): app, session, setup = test_server from fastapi.testclient import TestClient @@ -109,7 +104,7 @@ def client_post_with_auth(endpoint: str, content=None, **kwargs): """ if "auth" not in kwargs: - kwargs["auth"] = ("admin", "password") + kwargs["auth"] = (username, password) if "headers" not in kwargs: kwargs["headers"] = {"Content-Type": "application/json"} @@ -122,13 +117,23 @@ def client_post_with_auth(endpoint: str, content=None, **kwargs): client.post_with_auth = client_post_with_auth + return client + + +@pytest.fixture(scope="package") +def test_client(test_server): + """ + Returns a test client for the server. + """ + + client = create_test_client(test_server) + yield client del client -@pytest.fixture(scope="package") -def mocked_admin_client(test_client): +def make_mocked_admin_client(test_client, username="admin", password="password"): """ Returns an instance of AdminClient that is actually mocked to use the test client. @@ -137,8 +142,8 @@ def mocked_admin_client(test_client): client = AdminClient( host=str(test_client.base_url), port=80, - user="admin", - password="password", + user=username, + password=password, ) # Now need to replace post @@ -169,6 +174,18 @@ def new_post(endpoint: str, request, response=None): client.post = new_post + return client + + +@pytest.fixture(scope="package") +def mocked_admin_client(test_client): + """ + Returns an instance of AdminClient that is actually mocked + to use the test client. + """ + + client = make_mocked_admin_client(test_client) + yield client del client diff --git a/tests/integration_test/test_corruption_endpoints.py b/tests/integration_test/test_corruption_endpoints.py new file mode 100644 index 0000000..c5be195 --- /dev/null +++ b/tests/integration_test/test_corruption_endpoints.py @@ -0,0 +1,213 @@ +""" +Tests for the corruption endpoints. We need to use the integration test here because these +endpoints require a server that is contactable and that we can use async transfer managers for. +""" + +import random +from pathlib import Path + +from hera_librarian.authlevel import AuthLevel + + +def test_fix_missing_file( + test_server_with_many_files_and_errors, + test_orm, + mocked_admin_client, + server, + admin_client, + librarian_database_session_maker, + tmp_path, +): + # Create the accounts for the opposing servers + mocked_admin_client.create_user( + username="live_server", + password="password", + auth_level=AuthLevel.CALLBACK, + ) + + admin_client.create_user( + username="test_server", + password="password", + auth_level=AuthLevel.READAPPEND, + ) + + # Reigster the librarians with each other + assert mocked_admin_client.add_librarian( + name="live_server", + url="http://localhost", + authenticator="test_server:password", + port=server.id, + ) + + assert admin_client.add_librarian( + name="test_server", + url="http://localhost", + authenticator="live_server:password", # This is the default authenticator. + port=test_server_with_many_files_and_errors[2].id, + check_connection=False, + ) + + # Need to add a bunch of files to the source server + + file_cores = [f"repair_test_item_{x}.txt" for x in range(2)] + file_names = [f"repair_test/{file}" for file in file_cores] + + for file in file_cores: + with open(tmp_path / file, "w") as handle: + handle.write(str(random.randbytes(1024))) + + mocked_admin_client.upload(tmp_path / file, Path(f"repair_test/{file}")) + + # Mock up a couple of remote instances for these files on the destination, we must have these + # to use the repair workflow! + with test_server_with_many_files_and_errors[1]() as session: + for file_name in file_names: + file = ( + session.query(test_orm.File) + .filter(test_orm.File.name == file_name) + .one() + ) + librarian = ( + session.query(test_orm.Librarian) + .filter(test_orm.Librarian.name == "live_server") + .one() + ) + instance = test_orm.RemoteInstance.new_instance( + file=file, + store_id=1, + librarian=librarian, + ) + session.add(instance) + session.commit() + + # Now use the repair tasks to ask for copies of those files! + from hera_librarian.client import AdminClient + from hera_librarian.models.corrupt import ( + CorruptionPreparationRequest, + CorruptionPreparationResponse, + CorruptionResendRequest, + CorruptionResendResponse, + ) + + from ..conftest import create_test_client, make_mocked_admin_client + + # Ok, so we need to re-roll this process. + repair_request_client = make_mocked_admin_client( + create_test_client( + test_server_with_many_files_and_errors, + username="live_server", + password="password", + ), + username="live_server", + password="password", + ) + + for file_name in file_names: + assert repair_request_client.post( + "corrupt/prepare", + request=CorruptionPreparationRequest( + file_name=file_name, librarian_name="live_server" + ), + response=CorruptionPreparationResponse, + ).ready + + transfer_ids = [] + + for file_name in file_names: + resp = repair_request_client.post( + "corrupt/resend", + request=CorruptionResendRequest( + file_name=file_name, librarian_name="live_server" + ), + response=CorruptionResendResponse, + ) + + assert resp.success + transfer_ids.append(resp.destination_transfer_id) + + # Check in on those transfer IDs to see if they've been created on the downstream + with librarian_database_session_maker() as session: + for transfer_id in transfer_ids: + assert ( + session.query(test_orm.IncomingTransfer).filter_by(id=transfer_id).one() + ) + + # Now run the send queue and checkin tasks to get the files to the destination + from librarian_background.queues import CheckConsumedQueue, ConsumeQueue + from librarian_background.recieve_clone import RecieveClone + + consume_task = ConsumeQueue(name="consume_queue") + consume_task.core(session_maker=test_server_with_many_files_and_errors[1]) + + checkin_task = CheckConsumedQueue(name="checkin_queue") + checkin_task.core(session_maker=test_server_with_many_files_and_errors[1]) + + # Now we should have the files on the destination server and they need to be ingested + with librarian_database_session_maker() as session: + recv_task = RecieveClone(name="recieve_clone_job") + recv_task.core(session=session) + + for file_name in file_names: + assert session.query(test_orm.File).filter_by(name=file_name).one() + + # Delete it all. + with librarian_database_session_maker() as session: + for file_name in file_names: + file = session.query(test_orm.File).filter_by(name=file_name).one() + file.delete(session=session, commit=False, force=True) + + # Incoming and outgoing transfers + inc_transfers = ( + session.query(test_orm.IncomingTransfer) + .filter_by(upload_name=file_name) + .all() + ) + out_transfers = ( + session.query(test_orm.OutgoingTransfer) + .filter_by(file_name=file_name) + .all() + ) + + for transfer in inc_transfers: + session.delete(transfer) + + for transfer in out_transfers: + if transfer.send_queue: + session.delete(transfer.send_queue) + session.delete(transfer) + + session.commit() + + with test_server_with_many_files_and_errors[1]() as session: + for file_name in file_names: + file = session.query(test_orm.File).filter_by(name=file_name).one() + file.delete(session=session, commit=False, force=True) + + # Incoming and outgoing transfers + inc_transfers = ( + session.query(test_orm.IncomingTransfer) + .filter_by(upload_name=file_name) + .all() + ) + out_transfers = ( + session.query(test_orm.OutgoingTransfer) + .filter_by(file_name=file_name) + .all() + ) + + for transfer in inc_transfers: + session.delete(transfer) + + for transfer in out_transfers: + if transfer.send_queue: + session.delete(transfer.send_queue) + session.delete(transfer) + + session.commit() + + # Now delete accounts and librarians + mocked_admin_client.delete_user("live_server") + admin_client.delete_user("test_server") + + mocked_admin_client.remove_librarian("live_server") + admin_client.remove_librarian("test_server") diff --git a/tests/integration_test/test_send_queue.py b/tests/integration_test/test_send_queue.py index 88e31e8..db93e15 100644 --- a/tests/integration_test/test_send_queue.py +++ b/tests/integration_test/test_send_queue.py @@ -294,12 +294,12 @@ def test_send_from_existing_file_row( # Force downstream to execute their background tasks. from librarian_background.recieve_clone import RecieveClone - task = RecieveClone( + recv_task = RecieveClone( name="recv_clone", ) with librarian_database_session_maker() as session: - task.core(session=session) + recv_task.core(session=session) # Now check the downstream librarian that it got all those files! with source_session_maker() as session: @@ -402,6 +402,39 @@ def test_send_from_existing_file_row( for instance in file.instances: assert instance.available == False + # Now see what happens when we corrupt a file and run the appropriate background tasks. + # from librarian_background.check_integrity import CheckIntegrity + # from librarian_background.corruption_fixer import CorruptionFixer + + # # Break a file + # with librarian_database_session_maker() as session: + # instance = session.query(test_orm.Instance).limit(10).all()[-1] + # chosen_instance_id = instance.id + # purposefully_broken_path = instance.path + + # with open(purposefully_broken_path, "w") as f: + # f.write("hahaha i broke ur file") + + # CheckIntegrity( + # name="check_integrity", + # store_name="local_store", + # age_in_days=10 + # ).core(session=session) + + # # Check that we got a corrupt file + # with librarian_database_session_maker() as session: + # broken_file = session.query(test_orm.CorruptFile).filter_by(instance_id=chosen_instance_id).one_or_none() + # broken_file_name = broken_file.file_name + # assert broken_file is not None + + # # Now run the fix flow. + # task = CorruptionFixer( + # name="fix_me" + # ) + # with librarian_database_session_maker() as session: + # task.core(session) + # recv_task.core(session) + # Remove the librarians we added. assert mocked_admin_client.remove_librarian(name="live_server") diff --git a/tests/server.py b/tests/server.py index 59bc4de..550efdd 100644 --- a/tests/server.py +++ b/tests/server.py @@ -34,6 +34,8 @@ class Server(BaseModel): LIBRARIAN_BACKGROUND_CREATE_LOCAL_CLONE: str LIBRARIAN_BACKGROUND_SEND_CLONE: str LIBRARIAN_BACKGROUND_RECIEVE_CLONE: str + LIBRARIAN_BACKGROUND_CONSUME_QUEUE: str + LIBRARIAN_BACKGROUND_CHECK_CONSUMED_QUEUE: str process: str | None = None @property @@ -56,6 +58,8 @@ def env(self) -> dict[str, str]: "LIBRARIAN_BACKGROUND_CREATE_LOCAL_CLONE": self.LIBRARIAN_BACKGROUND_CREATE_LOCAL_CLONE, "LIBRARIAN_BACKGROUND_SEND_CLONE": self.LIBRARIAN_BACKGROUND_SEND_CLONE, "LIBRARIAN_BACKGROUND_RECIEVE_CLONE": self.LIBRARIAN_BACKGROUND_RECIEVE_CLONE, + "LIBRARIAN_BACKGROUND_CONSUME_QUEUE": self.LIBRARIAN_BACKGROUND_CONSUME_QUEUE, + "LIBRARIAN_BACKGROUND_CHECK_CONSUMED_QUEUE": self.LIBRARIAN_BACKGROUND_CHECK_CONSUMED_QUEUE, } @@ -215,6 +219,24 @@ def server_setup(tmp_path_factory, name="librarian_server") -> Server: ] ) + queue = json.dumps( + [ + { + "task_name": "queue", + "every": "00:01:00", + } + ] + ) + + check_queue = json.dumps( + [ + { + "task_name": "check_queue", + "every": "00:01:00", + } + ] + ) + return Server( id=server_id_and_port, base_path=tmp_path, @@ -223,7 +245,7 @@ def server_setup(tmp_path_factory, name="librarian_server") -> Server: database=database, LIBRARIAN_SERVER_NAME=name, LIBRARIAN_SERVER_DISPLAYED_SITE_NAME=name.replace("_", " ").title(), - LIBRARIAN_SERVER_ENCRYPTION_KEY=Fernet.generate_key().decode(), + LIBRARIAN_SERVER_ENCRYPTION_KEY=str(Fernet.generate_key().decode()), LIBRARIAN_SERVER_MAXIMAL_UPLOAD_SIZE_BYTES=1_000_000, # 1 MB for testing LIBRARIAN_CONFIG_PATH=librarian_config_path, LIBRARIAN_SERVER_DATABASE_DRIVER="sqlite", @@ -233,6 +255,8 @@ def server_setup(tmp_path_factory, name="librarian_server") -> Server: LIBRARIAN_BACKGROUND_CHECK_INTEGRITY=check_integrity, LIBRARIAN_BACKGROUND_CREATE_LOCAL_CLONE=create_local_clone, LIBRARIAN_BACKGROUND_RECIEVE_CLONE=recv_clone, + LIBRARIAN_BACKGROUND_CONSUME_QUEUE=queue, + LIBRARIAN_BACKGROUND_CHECK_CONSUMED_QUEUE=check_queue, LIBRARIAN_BACKGROUND_SEND_CLONE="[]", )