From ce5ebf9a04998de5333fcf247eb975ddd6c6bfd3 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Thu, 12 Dec 2024 16:44:16 -0500 Subject: [PATCH 1/8] First sketches of whole system --- ...c988372_add_librarian_transfer_toggling.py | 7 + librarian_background/corruption_fixer.py | 139 ++++++++++++ librarian_background/send_clone.py | 159 +++++++------- librarian_server/api/corrupt.py | 198 ++++++++++++++++++ librarian_server/orm/file.py | 2 + librarian_server/orm/instance.py | 4 +- 6 files changed, 437 insertions(+), 72 deletions(-) create mode 100644 librarian_background/corruption_fixer.py create mode 100644 librarian_server/api/corrupt.py diff --git a/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py b/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py index 74e1a6c..cf9291e 100644 --- a/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py +++ b/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py @@ -34,10 +34,17 @@ def upgrade(): sa.Column("size", sa.BigInteger(), nullable=False), sa.Column("checksum", sa.String(), nullable=False), sa.Column("count", sa.Integer(), nullable=False), + sa.Column("replacement_requested", sa.Boolean(), nullable=False), sa.PrimaryKeyConstraint("id"), ) + with op.batch_alter_table("outgoing_transfers") as batch_op: + batch_op.alter_column("file_name", nullable=True) + def downgrade(): op.drop_column("librarians", "transfers_enabled") op.drop_table("corrupt_files") + + with op.batch_alter_table("outgoing_transfers") as batch_op: + batch_op.alter_column("file_name", nullable=False) diff --git a/librarian_background/corruption_fixer.py b/librarian_background/corruption_fixer.py new file mode 100644 index 0000000..4db04dd --- /dev/null +++ b/librarian_background/corruption_fixer.py @@ -0,0 +1,139 @@ +""" +A background task that queries the corrupt files table and remedies them. +""" + +from time import perf_counter + +from loguru import logger +from sqlalchemy import select +from sqlalchemy.orm import Session + +from hera_librarian.errors import LibrarianError, LibrarianHTTPError +from hera_librarian.utils import compare_checksums, get_hash_function_from_hash +from librarian_server.database import get_session +from librarian_server.orm.file import CorruptFile, File +from librarian_server.orm.librarian import Librarian + +from .task import Task + + +class CorruptionFixer(Task): + """ + Checks in on corrupt files in the corrupt files table and remedies them. + """ + + def on_call(self): + with get_session() as session: + return self.core(session=session) + + def core(self, session: Session) -> bool: + query_start = perf_counter() + + stmt = ( + select(CorruptFile) + .filter(CorruptFile.replacement_requested != True) + .with_for_update(skip_locked=True) + ) + + results = session.execute(stmt).scalars().all() + + query_end = perf_counter() + + logger.info( + "Took {} s to query for {} corrupt files", + query_end - query_start, + len(results), + ) + + for corrupt in results: + logger.info( + "Attempting to fix {id} ({name})", id=corrupt.id, name=corrupt.file_name + ) + + # Step A: Check that the file is actually corrupt + try: + hash_function = get_hash_function_from_hash(corrupt.file.checksum) + instance = corrupt.instance + store = instance.store + path_info = store.store_manager.path_info( + instance.path, hash_function=hash_function + ) + + if compare_checksums(corrupt.file.checksum, path_info.checksum): + logger.info( + "CorruptFile {id} stated that file {name} was corrupt in instance {inst_id} " + "but we just checked the checksums: {chk_a}=={chk_b} and the file is fine " + "or was fixed behind our back; removing CorruptFile row", + id=corrupt.id, + name=corrupt.file_name, + inst_id=corrupt.instance_id, + chk_a=corrupt.file.checksum, + chk_b=path_info.checksum, + ) + session.delete(corrupt) + session.commit() + continue + except FileNotFoundError: + logger.error( + "Instance {} on store {} is missing, but we will continue with recovery (Instance: {})", + instance.path, + store.name, + instance.id, + ) + + # Ok, so the file _really is corrupt_ or it is missing + + # Remedy A: We have another local copy of the file! + # TODO: Implement this; it is not relevant for SO. + if len(corrupt.file.instances) > 1: + # Uhhh there is more than one instance here, we don't know what to do. + logger.error( + "File {name} has a corrupt instance {id} but there is {n} > 1 " + "instances of the file on this librarian; entered block that was " + "never completed and need manual remedy", + name=corrupt.file_name, + id=corrupt.instance_id, + n=len(corrupt.file.instances), + ) + continue + + # Remedy B: the origin of this file is another librarian. Ask for a new copy. + stmt = select(Librarian).filter_by(name=corrupt.file.source) + result = session.execute(stmt).scalars().one_or_none() + + if result is None: + logger.error( + "File {name} has one and only one corrupt instance {id} but there is no " + "valid librarian matching {lib} in the librarians table so cannot " + "request a new valid copy of the file", + name=corrupt.file_name, + id=corrupt.instance_id, + lib=corrupt.file.source, + ) + continue + + # Use the librarian to ask for a new copy. + result: Librarian + client = result.client() + + try: + client.ping() + except (LibrarianError, LibrarianHTTPError): + logger.error( + "Librarian {lib} is unreachable at the moment, cannot restore file {name}", + lib=result.name, + name=corrupt.file_name, + ) + continue + + # TODO: CALL PREPARE ENDPOINT + + # TODO: Deal with the fact that we would have broken remote instances..? + corrupt.file.delete(session=session, commit=False, force=True) + + # TODO: CALL RE-SEND ENDPOINT; DO NOT COMMIT UNTIL WE HEAR BACK; NOTE THAT WE WILL + # HAVE DELETED THE DATA EVEN IF WE FAIL (THAT IS NON-RECOVERABLE) BUT HAVING + # THE ROWS SIMPLIFIES THE LOGIC ABOVE. + + corrupt.replacement_requested = True + session.commit() diff --git a/librarian_background/send_clone.py b/librarian_background/send_clone.py index 08ecbf6..314d46a 100644 --- a/librarian_background/send_clone.py +++ b/librarian_background/send_clone.py @@ -449,6 +449,92 @@ def handle_existing_file( ) +def send_file_batch( + files: list[File], + librarian: Librarian, + session: Session, + store_preference: str | None = None, +): + client = librarian.client() + + outgoing_transfers, outgoing_information = process_batch( + files=files, + destination=librarian.name, + store_preference=store_preference, + ) + + session.add_all(outgoing_transfers) + session.commit() + + response = use_batch_to_call_librarian( + outgoing_transfers=outgoing_transfers, + outgoing_information=outgoing_information, + client=client, + librarian=librarian, + session=session, + ) + + # We were unable to speak to the librarian, and have had our + # transfers cancelled for us. Time to move on to the next + # batch and hope for the best. + + # Tested outside of the main loop. + if not response: # pragma: no cover + return False + + # Ok, they got out stuff. Need to do two things now: + # - Create the queue send item + # - Update the transfers with their information. + + send, transfer_provider, transfer_map = create_send_queue_item( + response=response, + outgoing_transfers=outgoing_transfers, + librarian=librarian, + session=session, + ) + + # Send is falsey if there was a problem in creating the send + # queue item. In that, case we've failed everything, and should break + # and come back later. + + # Tested outside of the main loop. + if not send: # pragma: no cover + return False + + # Now update the outgoing transfers with their information. + for transfer in outgoing_transfers: + remote_transfer_info: CloneBatchInitiationRequestFileItem = transfer_map.get( + transfer.id, None + ) + + if remote_transfer_info is None: # pragma: no cover + # This is an unreachable state; we already purged these + # scenarios. + logger.error( + "Trying to set parameters of a transfer that should not exist; " + "this should be an unreachable state." + ) + # In this case, the best thing that we can do is fail this individual + # transfer and pick it up later. + transfer.fail_transfer(session=session, commit=False) + + transfer.remote_transfer_id = remote_transfer_info.destination_transfer_id + transfer.transfer_data = transfer_provider + transfer.send_queue = send + transfer.send_queue_id = send.id + transfer.source_path = str(transfer.instance.path) + transfer.dest_path = str(remote_transfer_info.staging_location) + + session.commit() + + # Finally, call up the destination again and tell them everything is on its + # way. + + call_destination_and_state_ongoing(send=send, session=session) + + return True + + class SendClone(Task): """ Launches clones of files to a remote librarian. @@ -592,81 +678,14 @@ def core(self, session: Session): files_tried += this_batch_size - outgoing_transfers, outgoing_information = process_batch( + success = send_file_batch( files=files_to_try, - destination=self.destination_librarian, - store_preference=self.store_preference, - ) - - session.add_all(outgoing_transfers) - session.commit() - - response = use_batch_to_call_librarian( - outgoing_transfers=outgoing_transfers, - outgoing_information=outgoing_information, - client=client, - librarian=librarian, - session=session, - ) - - # We were unable to speak to the librarian, and have had our - # transfers cancelled for us. Time to move on to the next - # batch and hope for the best. - - # Tested outside of the main loop. - if not response: # pragma: no cover - continue - - # Ok, they got out stuff. Need to do two things now: - # - Create the queue send item - # - Update the transfers with their information. - - send, transfer_provider, transfer_map = create_send_queue_item( - response=response, - outgoing_transfers=outgoing_transfers, librarian=librarian, session=session, + store_preference=self.store_preference, ) - # Send is falsey if there was a problem in creating the send - # queue item. In that, case we've failed everything, and should break - # and come back later. - - # Tested outside of the main loop. - if not send: # pragma: no cover + if not success: break - # Now update the outgoing transfers with their information. - for transfer in outgoing_transfers: - remote_transfer_info: CloneBatchInitiationRequestFileItem = ( - transfer_map.get(transfer.id, None) - ) - - if remote_transfer_info is None: # pragma: no cover - # This is an unreachable state; we already purged these - # scenarios. - logger.error( - "Trying to set parameters of a transfer that should not exist; " - "this should be an unreachable state." - ) - # In this case, the best thing that we can do is fail this individual - # transfer and pick it up later. - transfer.fail_transfer(session=session, commit=False) - - transfer.remote_transfer_id = ( - remote_transfer_info.destination_transfer_id - ) - transfer.transfer_data = transfer_provider - transfer.send_queue = send - transfer.send_queue_id = send.id - transfer.source_path = str(transfer.instance.path) - transfer.dest_path = str(remote_transfer_info.staging_location) - - session.commit() - - # Finally, call up the destination again and tell them everything is on its - # way. - - call_destination_and_state_ongoing(send=send, session=session) - return diff --git a/librarian_server/api/corrupt.py b/librarian_server/api/corrupt.py new file mode 100644 index 0000000..25d5df0 --- /dev/null +++ b/librarian_server/api/corrupt.py @@ -0,0 +1,198 @@ +""" +API Endpoints for the upstream half of the corrupt files workflow. +""" + +from fastapi import APIRouter, Depends + +from hera_librarian.utils import get_hash_function_from_hash + +router = APIRouter(prefix="/api/v2/corrupt") + +from loguru import logger +from pydantic import BaseModel + +from ..database import yield_session +from .auth import CallbackUserDependency, ReadappendUserDependency + + +class CorruptionPreparationRequest(BaseModel): + file_name: str + librarian_name: str + + +class CorruptionPreparationResponse(BaseModel): + ready: bool + + +def user_and_librarian_validation_flow( + user, librarian_name, file_name +) -> tuple[Librarian, File, Instance, list[RemoteInstance]]: + user_is_librarian = user.username == librarian_name + + stmt = select(Librarian).filter_by(name=request.librarian_name) + librarian = session.execute(stmt).scalars().one_or_none() + + librarian_exists = librarian is not None + + stmt = select(RemoteInstance).filter_by( + file_name=request.file_name, librarian_id=librarian.id + ) + remote_instances = session.execute(stmt).scalars().all() + + remote_instance_registered_at_destination = bool(remote_instances) + + if not ( + remote_instance_registered_at_destination + and user_is_librarian + and librarian_exists + ): + # 401 + pass + + # So at this point we know: + # Downstream is the one asking for the new copy + # We sent them a copy that we confirmed + + # Check our own instance of the file to make sure it's not corrupted. + stmt = select(File).filter_by(file_name=request.file_name) + file = session.execute(stmt).scalars().one_or_none() + + try: + best_instance = [x for x in file.instances if x.available][0] + except IndexError: + # 400 + return + + hash_function = get_hash_function_from_hash(file.checksum) + path_info = best_instance.store.path_info( + best_instance.path, hash_function=hash_function + ) + + if not compare_checksums(file.checksum, path_info.checksum): + # Brother not this shit again + # 400 + # Add to corrupt files table + # Extremely unlikely + return + + # We know we have a valid copy of the file ready to go. + + from librarian_background import background_settings + + if not ( + background_settings.consume_queue + and background_settings.check_consumed_queue + and librarian.transfers_enabled + ): + # 400 we can't send anything! + return + + # Do we have login details for your librarian? + try: + librarian.client().ping(require_login=True) + except (LibrarianError, LibrarianHTTPError): + # Urrr we can't login no good + return + + return librarian, file, best_instance, remote_instances + + +@router.post("/prepare") +def prepare( + request: CorruptionPreparationRequest, + user: CallbackUserDependency, + session: Session = Depends(yield_session), +) -> CorruptionPreparationResponse: + """ + Prepare for a request to re-instate a downstream file. This checks: + + a) We can contact the downstream + b) We have a valid copy of the file + c) We have a send queue background task that will actually send the file. + + Possible response codes: + + 400 - We do not have a valid copy of the file either! + -> You are out of luck. Maybe try again later as we might restore from + a librarian above us in the chain? + 401 - You are asking about a file that was not sent to your librarian + -> Leave me alone! + 200 - Ready to send + -> Success! + """ + + logger.info( + "Recieved corruption remedy request for {} from {}", + request.file_name, + user.username, + ) + + user_and_librarian_validation_flow( + user, librarian_name=request.librarian_name, file_name=request.file_name + ) + + return CorruptionPreparationResponse(ready=True) + + +class CorruptionResendRequest(BaseModel): + librarian_name: str + file_name: str + + +class CorruptionResendResponse(BaseModel): + success: bool + + +@router.post("/resend") +def resend( + request: CorruptionResendRequest, + user: CallbackUserDependency, + session: Session, +) -> CorruptionResendResponse: + """ + Actually send a new copy of a file that we know you already have! We assume that + you deleted it before you called this endpoint, and that you called the prepare + endpoint to make sure we're all good to go first. We will: + + a) Delete our RemoteInstance(s) for this file on your librarian + b) Create an OutgoingTransfer and SendQueue + + This transfer will then take place asynchronously through your usual mechanisms. + You _must_ have a recieve clone task running on your librarian otherwise you won't + have the new file ingested. + + Possible response codes: + + 400 - We don't have a valid copy of the file. + 201 - We created the transfer + -> Success! + """ + + logger.info( + "Recieved corruption resend request for {} from {}", + request.file_name, + user.username, + ) + + librarian, file, instance, remote_instances = user_and_librarian_validation_flow( + user, librarian_name=request.librarian_name, file_name=request.file_name + ) + + from librarian_background.create_clone import send_file_batch + + success = send_file_batch(files=[file], librarian=librarian, session=session) + + if success: + logger.info( + "Successfully created send queue item to remedy corrupt data in {}", + request.file_name, + ) + session.delete(remote_instances) + session.commit() + else: + logger.info( + "Error creating send queue item to remedy corrupt data in {}", + request.file_name, + ) + + return CorruptionResendResponse(success=success) diff --git a/librarian_server/orm/file.py b/librarian_server/orm/file.py index a7d04e5..5f23aba 100644 --- a/librarian_server/orm/file.py +++ b/librarian_server/orm/file.py @@ -177,6 +177,8 @@ class CorruptFile(db.Base): "The checksum of the file that was re-computed and found to be incorrect." count: int = db.Column(db.Integer) "The number of times this file has been marked as corrupt." + replacement_requested: bool = db.Column(db.Boolean, default=False) + "Whether or not a replacement has been requested for this file." @classmethod def new_corrupt_file( diff --git a/librarian_server/orm/instance.py b/librarian_server/orm/instance.py index 8ee0240..18638b5 100644 --- a/librarian_server/orm/instance.py +++ b/librarian_server/orm/instance.py @@ -135,8 +135,8 @@ class RemoteInstance(db.Base): id = db.Column(db.Integer, primary_key=True, autoincrement=True, unique=True) "The unique ID of this instance." - file_name = db.Column(db.String(256), db.ForeignKey("files.name"), nullable=False) - "Name of the file this instance references." + file_name = db.Column(db.String(256), db.ForeignKey("files.name"), nullable=True) + "Name of the file this instance references; note this is NOT a foreign key" file = db.relationship( "File", back_populates="remote_instances", From e81819fbc0f33d46efb5c2c42e157713f5d2482c Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Fri, 13 Dec 2024 08:55:15 -0500 Subject: [PATCH 2/8] Error handling --- librarian_server/api/corrupt.py | 84 +++++++++++++++++++++++---------- 1 file changed, 58 insertions(+), 26 deletions(-) diff --git a/librarian_server/api/corrupt.py b/librarian_server/api/corrupt.py index 25d5df0..2107012 100644 --- a/librarian_server/api/corrupt.py +++ b/librarian_server/api/corrupt.py @@ -2,9 +2,14 @@ API Endpoints for the upstream half of the corrupt files workflow. """ -from fastapi import APIRouter, Depends +from fastapi import APIRouter, Depends, File, HTTPException, status +from sqlalchemy import select +from sqlalchemy.orm import Session -from hera_librarian.utils import get_hash_function_from_hash +from hera_librarian.exceptions import LibrarianError, LibrarianHTTPError +from hera_librarian.utils import compare_checksums, get_hash_function_from_hash +from librarian_server.orm.instance import Instance, RemoteInstance +from librarian_server.orm.librarian import Librarian router = APIRouter(prefix="/api/v2/corrupt") @@ -25,17 +30,17 @@ class CorruptionPreparationResponse(BaseModel): def user_and_librarian_validation_flow( - user, librarian_name, file_name + user, librarian_name, file_name, session ) -> tuple[Librarian, File, Instance, list[RemoteInstance]]: user_is_librarian = user.username == librarian_name - stmt = select(Librarian).filter_by(name=request.librarian_name) + stmt = select(Librarian).filter_by(name=librarian_name) librarian = session.execute(stmt).scalars().one_or_none() librarian_exists = librarian is not None stmt = select(RemoteInstance).filter_by( - file_name=request.file_name, librarian_id=librarian.id + file_name=file_name, librarian_id=librarian.id ) remote_instances = session.execute(stmt).scalars().all() @@ -46,22 +51,32 @@ def user_and_librarian_validation_flow( and user_is_librarian and librarian_exists ): - # 401 - pass + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail=dict( + reason="Unauthorized", + suggested_remedy="", + ), + ) # So at this point we know: # Downstream is the one asking for the new copy # We sent them a copy that we confirmed # Check our own instance of the file to make sure it's not corrupted. - stmt = select(File).filter_by(file_name=request.file_name) + stmt = select(File).filter_by(file_name=file_name) file = session.execute(stmt).scalars().one_or_none() try: best_instance = [x for x in file.instances if x.available][0] except IndexError: - # 400 - return + raise HTTPException( + status_code=status.HTTP_409_BAD_REQUEST, + detail=dict( + reason="We do not have a copy of the file you are requesting", + suggested_remedy="Check your database; you likely did not get the file from us", + ), + ) hash_function = get_hash_function_from_hash(file.checksum) path_info = best_instance.store.path_info( @@ -69,30 +84,41 @@ def user_and_librarian_validation_flow( ) if not compare_checksums(file.checksum, path_info.checksum): + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=dict( + reason="Our copy of the file is also corrupt", + suggested_remedy="Wait a while, we will attempt to fix this copy", + ), + ) # Brother not this shit again - # 400 - # Add to corrupt files table + # Add to corrupt files table? # Extremely unlikely - return # We know we have a valid copy of the file ready to go. + # Do we have login details for your librarian? + login_success = True + try: + librarian.client().ping(require_login=True) + except (LibrarianError, LibrarianHTTPError): + login_success = False + from librarian_background import background_settings if not ( background_settings.consume_queue and background_settings.check_consumed_queue and librarian.transfers_enabled + and login_success ): - # 400 we can't send anything! - return - - # Do we have login details for your librarian? - try: - librarian.client().ping(require_login=True) - except (LibrarianError, LibrarianHTTPError): - # Urrr we can't login no good - return + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=dict( + reason="We are not able to send you files", + suggested_remedy="Check every pre-condition for file transfers is met", + ), + ) return librarian, file, best_instance, remote_instances @@ -112,7 +138,7 @@ def prepare( Possible response codes: - 400 - We do not have a valid copy of the file either! + 409 - We do not have a valid copy of the file either! -> You are out of luck. Maybe try again later as we might restore from a librarian above us in the chain? 401 - You are asking about a file that was not sent to your librarian @@ -128,7 +154,10 @@ def prepare( ) user_and_librarian_validation_flow( - user, librarian_name=request.librarian_name, file_name=request.file_name + user, + librarian_name=request.librarian_name, + file_name=request.file_name, + session=session, ) return CorruptionPreparationResponse(ready=True) @@ -163,7 +192,7 @@ def resend( Possible response codes: - 400 - We don't have a valid copy of the file. + 409 - We don't have a valid copy of the file. 201 - We created the transfer -> Success! """ @@ -175,7 +204,10 @@ def resend( ) librarian, file, instance, remote_instances = user_and_librarian_validation_flow( - user, librarian_name=request.librarian_name, file_name=request.file_name + user, + librarian_name=request.librarian_name, + file_name=request.file_name, + session=session, ) from librarian_background.create_clone import send_file_batch From 2c002eb3fb771a6448707adf2c466937896c2c9f Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Fri, 13 Dec 2024 13:54:55 -0500 Subject: [PATCH 3/8] Added full initial version of background task --- hera_librarian/models/corrupt.py | 24 +++ librarian_background/corruption_fixer.py | 204 ++++++++++++++++++---- librarian_background/send_clone.py | 2 +- librarian_server/api/corrupt.py | 38 ++-- librarian_server/orm/file.py | 23 ++- tests/integration_test/test_send_queue.py | 4 + 6 files changed, 228 insertions(+), 67 deletions(-) create mode 100644 hera_librarian/models/corrupt.py diff --git a/hera_librarian/models/corrupt.py b/hera_librarian/models/corrupt.py new file mode 100644 index 0000000..45e598e --- /dev/null +++ b/hera_librarian/models/corrupt.py @@ -0,0 +1,24 @@ +""" +Models for the corruption fixing endpoints. +""" + +from pydantic import BaseModel + + +class CorruptionPreparationRequest(BaseModel): + file_name: str + librarian_name: str + + +class CorruptionPreparationResponse(BaseModel): + ready: bool + + +class CorruptionResendRequest(BaseModel): + librarian_name: str + file_name: str + + +class CorruptionResendResponse(BaseModel): + success: bool + destination_transfer_id: int diff --git a/librarian_background/corruption_fixer.py b/librarian_background/corruption_fixer.py index 4db04dd..09c62d4 100644 --- a/librarian_background/corruption_fixer.py +++ b/librarian_background/corruption_fixer.py @@ -9,10 +9,19 @@ from sqlalchemy.orm import Session from hera_librarian.errors import LibrarianError, LibrarianHTTPError +from hera_librarian.models.corrupt import ( + CorruptionPreparationRequest, + CorruptionPreparationResponse, + CorruptionResendRequest, + CorruptionResendResponse, +) +from hera_librarian.transfer import TransferStatus from hera_librarian.utils import compare_checksums, get_hash_function_from_hash from librarian_server.database import get_session from librarian_server.orm.file import CorruptFile, File +from librarian_server.orm.instance import Instance from librarian_server.orm.librarian import Librarian +from librarian_server.orm.transfer import IncomingTransfer from .task import Task @@ -50,16 +59,24 @@ def core(self, session: Session) -> bool: "Attempting to fix {id} ({name})", id=corrupt.id, name=corrupt.file_name ) + # First: query the file table to see if we still have the file. We do not store + # a foreign key in the corrupt table because we may have deleted the file and + # failed to contact the upstream. + stmt = select(File).filter_by(name=corrupt.file_name) + potential_file = session.execute(stmt).scalar_one_or_none() + + stmt = select(Instance).filter_by(id=corrupt.instance_id) + potential_instance = session.execute(stmt).scalar_one_or_none() + # Step A: Check that the file is actually corrupt try: - hash_function = get_hash_function_from_hash(corrupt.file.checksum) - instance = corrupt.instance - store = instance.store + hash_function = get_hash_function_from_hash(potential_file.checksum) + store = potential_instance.store path_info = store.store_manager.path_info( - instance.path, hash_function=hash_function + potential_instance.path, hash_function=hash_function ) - if compare_checksums(corrupt.file.checksum, path_info.checksum): + if compare_checksums(potential_file.checksum, path_info.checksum): logger.info( "CorruptFile {id} stated that file {name} was corrupt in instance {inst_id} " "but we just checked the checksums: {chk_a}=={chk_b} and the file is fine " @@ -67,39 +84,38 @@ def core(self, session: Session) -> bool: id=corrupt.id, name=corrupt.file_name, inst_id=corrupt.instance_id, - chk_a=corrupt.file.checksum, + chk_a=potential_file.checksum, chk_b=path_info.checksum, ) session.delete(corrupt) session.commit() continue - except FileNotFoundError: - logger.error( - "Instance {} on store {} is missing, but we will continue with recovery (Instance: {})", - instance.path, - store.name, - instance.id, - ) - # Ok, so the file _really is corrupt_ or it is missing - - # Remedy A: We have another local copy of the file! - # TODO: Implement this; it is not relevant for SO. - if len(corrupt.file.instances) > 1: - # Uhhh there is more than one instance here, we don't know what to do. + # Remedy A: We have another local copy of the file! + # TODO: Implement this; it is not relevant for SO. + if len(potential_file.instances) > 1: + # Uhhh there is more than one instance here, we don't know what to do. + logger.error( + "File {name} has a corrupt instance {id} but there is {n} > 1 " + "instances of the file on this librarian; entered block that was " + "never completed and need manual remedy", + name=corrupt.file_name, + id=corrupt.instance_id, + n=len(potential_file.instances), + ) + continue + except (FileNotFoundError, AttributeError): logger.error( - "File {name} has a corrupt instance {id} but there is {n} > 1 " - "instances of the file on this librarian; entered block that was " - "never completed and need manual remedy", - name=corrupt.file_name, - id=corrupt.instance_id, - n=len(corrupt.file.instances), + "Instance {} is missing, but we will continue with recovery (File: {})", + corrupt.instance_id, + corrupt.file_name, ) - continue + + # Ok, so the file _really is corrupt_ or it is missing and we only have one instance # Remedy B: the origin of this file is another librarian. Ask for a new copy. - stmt = select(Librarian).filter_by(name=corrupt.file.source) - result = session.execute(stmt).scalars().one_or_none() + stmt = select(Librarian).filter_by(name=corrupt.file_source) + result = session.execute(stmt).scalar_one_or_none() if result is None: logger.error( @@ -108,7 +124,7 @@ def core(self, session: Session) -> bool: "request a new valid copy of the file", name=corrupt.file_name, id=corrupt.instance_id, - lib=corrupt.file.source, + lib=corrupt.file_source, ) continue @@ -126,14 +142,132 @@ def core(self, session: Session) -> bool: ) continue - # TODO: CALL PREPARE ENDPOINT + prepare_request = CorruptionPreparationRequest( + file_name=corrupt.file_name, librarian_name=result.name + ) - # TODO: Deal with the fact that we would have broken remote instances..? - corrupt.file.delete(session=session, commit=False, force=True) + try: + prepare_response: CorruptionPreparationResponse = client.post( + endpoint="corrupt/prepare", + request=prepare_request, + response=CorruptionPreparationResponse, + ) - # TODO: CALL RE-SEND ENDPOINT; DO NOT COMMIT UNTIL WE HEAR BACK; NOTE THAT WE WILL - # HAVE DELETED THE DATA EVEN IF WE FAIL (THAT IS NON-RECOVERABLE) BUT HAVING - # THE ROWS SIMPLIFIES THE LOGIC ABOVE. + if not prepare_response.ready: + raise ValueError("Preparation endpoint returned False") + except (LibrarianError, LibrarianHTTPError, ValueError) as e: + logger.error( + "Librarian {lib} contact during preparation for corruption fix to restore " + "{name} did not succeed: {e}", + lib=result.name, + name=corrupt.file_name, + e=e, + ) + continue + + # This also deletes remote instances which will need to be repaired. However + # it is unlikely that we will be in that situation. Unfortunately we _must_ commit + # this as the files table must be accessed from a different table. + corrupt.file.delete(session=session, commit=True, force=True) + + resend_request = CorruptionResendRequest( + file_name=corrupt.file_name, + librarian_name=result.name, + ) + + try: + resend_response: CorruptionResendResponse = client.post( + "corrupt/resend", + request=resend_request, + response=CorruptionResendResponse, + ) + + if not resend_response.success: + raise ValueError("Failure during resend") + except (LibrarianError, LibrarianHTTPError): + logger.error( + "Failed during the resend request flow for librarian {lib}, " + "corrupt {id} for file {name} with {e}", + lib=result.name, + id=corrupt.id, + name=corrupt.file_name, + e=e, + ) + continue + corrupt.incoming_transfer_id = resend_response.destination_transfer_id corrupt.replacement_requested = True session.commit() + + # Now check in on files that we already requested new copies of. + query_start = perf_counter() + + stmt = ( + select(CorruptFile) + .filter(CorruptFile.replacement_requested == True) + .with_for_update(skip_locked=True) + ) + + results = session.execute(stmt).scalars().all() + + query_end = perf_counter() + + logger.info( + "Took {} s to query for {} corrupt files already in progress", + query_end - query_start, + len(results), + ) + + for result in results: + stmt = select(IncomingTransfer).filter_by(id=result.incoming_transfer_id) + transfer = session.execute(stmt).scalar_one_or_none() + + file_is_fixed = False + + if transfer.status in [TransferStatus.FAILED, TransferStatus.CANCELLED]: + logger.warning( + "Transfer for corrupt file {id} ({name}) is in status {status}", + id=result.id, + name=result.file_name, + status=transfer.status, + ) + # That's no good. We should check to see if we got the file anyway: + stmt = select(File).filter_by(name=result.file_name) + file = session.execute(stmt).scalar_one_or_none() + + if file is not None: + # Oh, we're good. Phew, we successfully ingested it. + logger.info( + "Though transfer is in status {status}, file {name} was successfully " + "ingested anyway", + status=transfer.status, + name=result.file_name, + ) + file_is_fixed = True + else: + # We actually need to re-download it. + logger.warning( + "Re-setting corrupt file {id} ({name}) to not having a replacement requested " + "as the transfer failed. It will be re-downloaded at the next invocation ", + id=result.id, + name=result.file_name, + ) + result.replacement_requested = False + elif transfer.status in [TransferStatus.COMPLETED]: + # That's good, we got the file! + file_is_fixed = True + else: + file_is_fixed = False + + if file_is_fixed: + logger.info( + "Confirmed that corrupt file {id} ({name}) has been replaced with a new copy; " + "deleting the CorruptFile row", + id=result.id, + name=result.file_name, + ) + session.delete(result) + + session.commit() + + return diff --git a/librarian_background/send_clone.py b/librarian_background/send_clone.py index 314d46a..2fdf5d0 100644 --- a/librarian_background/send_clone.py +++ b/librarian_background/send_clone.py @@ -532,7 +532,7 @@ def send_file_batch( call_destination_and_state_ongoing(send=send, session=session) - return True + return list(transfer_map.values()) class SendClone(Task): diff --git a/librarian_server/api/corrupt.py b/librarian_server/api/corrupt.py index 2107012..cea2100 100644 --- a/librarian_server/api/corrupt.py +++ b/librarian_server/api/corrupt.py @@ -7,6 +7,12 @@ from sqlalchemy.orm import Session from hera_librarian.exceptions import LibrarianError, LibrarianHTTPError +from hera_librarian.models.corrupt import ( + CorruptionPreparationRequest, + CorruptionPreparationResponse, + CorruptionResendRequest, + CorruptionResendResponse, +) from hera_librarian.utils import compare_checksums, get_hash_function_from_hash from librarian_server.orm.instance import Instance, RemoteInstance from librarian_server.orm.librarian import Librarian @@ -14,24 +20,19 @@ router = APIRouter(prefix="/api/v2/corrupt") from loguru import logger -from pydantic import BaseModel from ..database import yield_session -from .auth import CallbackUserDependency, ReadappendUserDependency - - -class CorruptionPreparationRequest(BaseModel): - file_name: str - librarian_name: str - - -class CorruptionPreparationResponse(BaseModel): - ready: bool +from .auth import CallbackUserDependency, User def user_and_librarian_validation_flow( - user, librarian_name, file_name, session + user: User, librarian_name: str, file_name: str, session: Session ) -> tuple[Librarian, File, Instance, list[RemoteInstance]]: + """ + Figure out if this user is a librarian and that we can make file transfers + to that librarian for this file. Also validates the file on our librarian to make + sure it is not corrupt and is present. + """ user_is_librarian = user.username == librarian_name stmt = select(Librarian).filter_by(name=librarian_name) @@ -163,15 +164,6 @@ def prepare( return CorruptionPreparationResponse(ready=True) -class CorruptionResendRequest(BaseModel): - librarian_name: str - file_name: str - - -class CorruptionResendResponse(BaseModel): - success: bool - - @router.post("/resend") def resend( request: CorruptionResendRequest, @@ -227,4 +219,6 @@ def resend( request.file_name, ) - return CorruptionResendResponse(success=success) + return CorruptionResendResponse( + success=bool(success), destination_transfer_id=success[0] + ) diff --git a/librarian_server/orm/file.py b/librarian_server/orm/file.py index 5f23aba..eb0d020 100644 --- a/librarian_server/orm/file.py +++ b/librarian_server/orm/file.py @@ -150,7 +150,10 @@ def delete( class CorruptFile(db.Base): """ An ORM object for a file that has been marked as (potentially) corrupt - during a check. This will need to be verified and fixed. + during a check. This will need to be verified and fixed. We do not store + references to the files and instances table here because those may be + deleted as part of the recovery process. As such, we need to store + copies of that data so that we can ask upstream for file recovery. """ __tablename__ = "corrupt_files" @@ -158,16 +161,14 @@ class CorruptFile(db.Base): id: int = db.Column(db.Integer, primary_key=True) "The ID of the corrupt file." file_name: str = db.Column( - db.String(256), db.ForeignKey("files.name"), nullable=False + db.String(256), ) "The name of the file." - file = db.relationship("File", primaryjoin="CorruptFile.file_name == File.name") - "The file object associated with this." - instance_id: int = db.Column(db.Integer, db.ForeignKey("instances.id")) + file_source: str = db.Column(db.String(256)) + "The source of the file." + instance_id: int = db.Column(db.Integer) "The instance ID of the corrupt file." - instance = db.relationship( - "Instance", primaryjoin="CorruptFile.instance_id == Instance.id" - ) + instance_path: str = db.Column(db.String(256)) "The instance object associated with this." corrupt_time: datetime = db.Column(db.DateTime) "The time at which the file was marked as corrupt." @@ -177,8 +178,11 @@ class CorruptFile(db.Base): "The checksum of the file that was re-computed and found to be incorrect." count: int = db.Column(db.Integer) "The number of times this file has been marked as corrupt." + replacement_requested: bool = db.Column(db.Boolean, default=False) "Whether or not a replacement has been requested for this file." + incoming_transfer_id: int = db.Column(db.Integer) + "The incoming transfer associated with the replacement" @classmethod def new_corrupt_file( @@ -204,8 +208,9 @@ def new_corrupt_file( return CorruptFile( file_name=instance.file.name, - file=instance.file, + file_source=instance.file.source, instance_id=instance.id, + instance_path=instance.path, instance=instance, corrupt_time=datetime.now(timezone.utc), size=size, diff --git a/tests/integration_test/test_send_queue.py b/tests/integration_test/test_send_queue.py index 88e31e8..8808a54 100644 --- a/tests/integration_test/test_send_queue.py +++ b/tests/integration_test/test_send_queue.py @@ -402,6 +402,10 @@ def test_send_from_existing_file_row( for instance in file.instances: assert instance.available == False + # Now see what happens when we corrupt a file and run the appropriate background tasks. + from librarian_background.check_integrity import CheckIntegrity + from librarian_background.corruption_fixer import CorruptionFixer + # Remove the librarians we added. assert mocked_admin_client.remove_librarian(name="live_server") From 23bcf925a0c5b165d50ae453a256d3440d0a3124 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Fri, 13 Dec 2024 14:20:15 -0500 Subject: [PATCH 4/8] Fixes to rationalize database schema --- .../versions/1def8c988372_add_librarian_transfer_toggling.py | 3 +++ librarian_background/corruption_fixer.py | 2 +- librarian_server/orm/file.py | 1 - 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py b/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py index cf9291e..3b54f06 100644 --- a/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py +++ b/alembic/versions/1def8c988372_add_librarian_transfer_toggling.py @@ -29,12 +29,15 @@ def upgrade(): "corrupt_files", sa.Column("id", sa.Integer(), nullable=False), sa.Column("file_name", sa.String(), nullable=False), + sa.Column("file_source", sa.String(), nullable=False), sa.Column("instance_id", sa.Integer(), nullable=False), + sa.Column("instance_path", sa.String(), nullable=False), sa.Column("corrupt_time", sa.DateTime(), nullable=False), sa.Column("size", sa.BigInteger(), nullable=False), sa.Column("checksum", sa.String(), nullable=False), sa.Column("count", sa.Integer(), nullable=False), sa.Column("replacement_requested", sa.Boolean(), nullable=False), + sa.Column("incoming_transfer_id", sa.Integer(), nullable=True), sa.PrimaryKeyConstraint("id"), ) diff --git a/librarian_background/corruption_fixer.py b/librarian_background/corruption_fixer.py index 09c62d4..1aad8b3 100644 --- a/librarian_background/corruption_fixer.py +++ b/librarian_background/corruption_fixer.py @@ -8,7 +8,7 @@ from sqlalchemy import select from sqlalchemy.orm import Session -from hera_librarian.errors import LibrarianError, LibrarianHTTPError +from hera_librarian.exceptions import LibrarianError, LibrarianHTTPError from hera_librarian.models.corrupt import ( CorruptionPreparationRequest, CorruptionPreparationResponse, diff --git a/librarian_server/orm/file.py b/librarian_server/orm/file.py index eb0d020..9e8a93c 100644 --- a/librarian_server/orm/file.py +++ b/librarian_server/orm/file.py @@ -211,7 +211,6 @@ def new_corrupt_file( file_source=instance.file.source, instance_id=instance.id, instance_path=instance.path, - instance=instance, corrupt_time=datetime.now(timezone.utc), size=size, checksum=checksum, From c7012ea8d10dcb0c6f6a36b3426b1199c4b8fcdd Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Mon, 16 Dec 2024 15:05:12 -0500 Subject: [PATCH 5/8] Added integratoin test for corruption endpoints --- librarian_background/recieve_clone.py | 15 +- librarian_server/__init__.py | 2 + librarian_server/api/__init__.py | 1 + librarian_server/api/corrupt.py | 55 ++++- tests/conftest.py | 39 +++- .../test_corruption_endpoints.py | 213 ++++++++++++++++++ tests/integration_test/test_send_queue.py | 37 ++- tests/server.py | 26 ++- 8 files changed, 353 insertions(+), 35 deletions(-) create mode 100644 tests/integration_test/test_corruption_endpoints.py diff --git a/librarian_background/recieve_clone.py b/librarian_background/recieve_clone.py index ac3967a..bae22aa 100644 --- a/librarian_background/recieve_clone.py +++ b/librarian_background/recieve_clone.py @@ -5,12 +5,10 @@ """ import datetime -import logging import time -import traceback -from pathlib import Path -from typing import TYPE_CHECKING, Optional +from typing import Optional +import cryptography from loguru import logger from sqlalchemy.orm import Session @@ -19,9 +17,7 @@ from hera_librarian.models.clone import CloneCompleteRequest, CloneCompleteResponse from librarian_server.database import get_session from librarian_server.orm import ( - File, IncomingTransfer, - Instance, Librarian, StoreMetadata, TransferStatus, @@ -157,16 +153,17 @@ def core(self, session: Session): logger.debug(f"Request to send: {request}") - downstream_client = librarian.client() - try: + downstream_client = librarian.client() + logger.info("Sending clone complete request") response: CloneCompleteResponse = downstream_client.post( endpoint="clone/complete", request=request, response=CloneCompleteResponse, ) - except LibrarianHTTPError as e: + except (LibrarianHTTPError, cryptography.fernet.InvalidToken) as e: + # Cryptography error sometimes happens when we mix up configs in testing logger.error( "Failed to call back to librarian {name} with exception {e}", name=librarian.name, diff --git a/librarian_server/__init__.py b/librarian_server/__init__.py index d6bad1b..67686fa 100644 --- a/librarian_server/__init__.py +++ b/librarian_server/__init__.py @@ -44,6 +44,7 @@ def main() -> FastAPI: admin_router, checkin_router, clone_router, + corrupt_router, error_router, ping_router, search_router, @@ -61,5 +62,6 @@ def main() -> FastAPI: app.include_router(admin_router) app.include_router(checkin_router) app.include_router(validate_router) + app.include_router(corrupt_router) return app diff --git a/librarian_server/api/__init__.py b/librarian_server/api/__init__.py index 36a9d68..e7f50b1 100644 --- a/librarian_server/api/__init__.py +++ b/librarian_server/api/__init__.py @@ -8,6 +8,7 @@ from .admin import router as admin_router from .checkin import router as checkin_router from .clone import router as clone_router +from .corrupt import router as corrupt_router from .errors import router as error_router from .ping import router as ping_router from .search import router as search_router diff --git a/librarian_server/api/corrupt.py b/librarian_server/api/corrupt.py index cea2100..46dd188 100644 --- a/librarian_server/api/corrupt.py +++ b/librarian_server/api/corrupt.py @@ -2,7 +2,7 @@ API Endpoints for the upstream half of the corrupt files workflow. """ -from fastapi import APIRouter, Depends, File, HTTPException, status +from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy import select from sqlalchemy.orm import Session @@ -14,6 +14,7 @@ CorruptionResendResponse, ) from hera_librarian.utils import compare_checksums, get_hash_function_from_hash +from librarian_server.orm.file import File from librarian_server.orm.instance import Instance, RemoteInstance from librarian_server.orm.librarian import Librarian @@ -52,6 +53,12 @@ def user_and_librarian_validation_flow( and user_is_librarian and librarian_exists ): + logger.debug( + "Problem authenticating remedy request, Remote instance: {}, User is librarian: {}, Librarian exists: {}", + remote_instance_registered_at_destination, + user_is_librarian, + librarian_exists, + ) raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail=dict( @@ -65,7 +72,7 @@ def user_and_librarian_validation_flow( # We sent them a copy that we confirmed # Check our own instance of the file to make sure it's not corrupted. - stmt = select(File).filter_by(file_name=file_name) + stmt = select(File).filter_by(name=file_name) file = session.execute(stmt).scalars().one_or_none() try: @@ -80,11 +87,16 @@ def user_and_librarian_validation_flow( ) hash_function = get_hash_function_from_hash(file.checksum) - path_info = best_instance.store.path_info( + path_info = best_instance.store.store_manager.path_info( best_instance.path, hash_function=hash_function ) if not compare_checksums(file.checksum, path_info.checksum): + logger.error( + "Our copy of the file {} is corrupt, we cannot send it to {}", + file_name, + librarian_name, + ) raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=dict( @@ -113,6 +125,15 @@ def user_and_librarian_validation_flow( and librarian.transfers_enabled and login_success ): + logger.warning( + "Unable to transfer files to downstream librarian {}: " + "Consume queue: {}, check consume queue: {}, transfers enabled: {}, login success: {}", + librarian.name, + bool(background_settings.consume_queue), + bool(background_settings.check_consumed_queue), + librarian.transfers_enabled, + login_success, + ) raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail=dict( @@ -161,6 +182,12 @@ def prepare( session=session, ) + logger.info( + "Prepared to send a new copy of {} to {}", + request.file_name, + request.librarian_name, + ) + return CorruptionPreparationResponse(ready=True) @@ -168,7 +195,7 @@ def prepare( def resend( request: CorruptionResendRequest, user: CallbackUserDependency, - session: Session, + session: Session = Depends(yield_session), ) -> CorruptionResendResponse: """ Actually send a new copy of a file that we know you already have! We assume that @@ -202,7 +229,7 @@ def resend( session=session, ) - from librarian_background.create_clone import send_file_batch + from librarian_background.send_clone import send_file_batch success = send_file_batch(files=[file], librarian=librarian, session=session) @@ -211,14 +238,22 @@ def resend( "Successfully created send queue item to remedy corrupt data in {}", request.file_name, ) - session.delete(remote_instances) + for ri in remote_instances: + session.delete(ri) session.commit() + return CorruptionResendResponse( + success=bool(success), + destination_transfer_id=success[0].destination_transfer_id, + ) else: logger.info( "Error creating send queue item to remedy corrupt data in {}", request.file_name, ) - - return CorruptionResendResponse( - success=bool(success), destination_transfer_id=success[0] - ) + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=dict( + reason="Error creating send queue item", + suggested_remedy="Check the logs for more information", + ), + ) diff --git a/tests/conftest.py b/tests/conftest.py index c4377a9..fc99e06 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -91,12 +91,7 @@ def test_server(tmp_path_factory): os.environ[env_var] = env_vars[env_var] -@pytest.fixture(scope="package") -def test_client(test_server): - """ - Returns a test client for the server. - """ - +def create_test_client(test_server, username="admin", password="password"): app, session, setup = test_server from fastapi.testclient import TestClient @@ -109,7 +104,7 @@ def client_post_with_auth(endpoint: str, content=None, **kwargs): """ if "auth" not in kwargs: - kwargs["auth"] = ("admin", "password") + kwargs["auth"] = (username, password) if "headers" not in kwargs: kwargs["headers"] = {"Content-Type": "application/json"} @@ -122,13 +117,23 @@ def client_post_with_auth(endpoint: str, content=None, **kwargs): client.post_with_auth = client_post_with_auth + return client + + +@pytest.fixture(scope="package") +def test_client(test_server): + """ + Returns a test client for the server. + """ + + client = create_test_client(test_server) + yield client del client -@pytest.fixture(scope="package") -def mocked_admin_client(test_client): +def make_mocked_admin_client(test_client, username="admin", password="password"): """ Returns an instance of AdminClient that is actually mocked to use the test client. @@ -137,8 +142,8 @@ def mocked_admin_client(test_client): client = AdminClient( host=str(test_client.base_url), port=80, - user="admin", - password="password", + user=username, + password=password, ) # Now need to replace post @@ -169,6 +174,18 @@ def new_post(endpoint: str, request, response=None): client.post = new_post + return client + + +@pytest.fixture(scope="package") +def mocked_admin_client(test_client): + """ + Returns an instance of AdminClient that is actually mocked + to use the test client. + """ + + client = make_mocked_admin_client(test_client) + yield client del client diff --git a/tests/integration_test/test_corruption_endpoints.py b/tests/integration_test/test_corruption_endpoints.py new file mode 100644 index 0000000..c5be195 --- /dev/null +++ b/tests/integration_test/test_corruption_endpoints.py @@ -0,0 +1,213 @@ +""" +Tests for the corruption endpoints. We need to use the integration test here because these +endpoints require a server that is contactable and that we can use async transfer managers for. +""" + +import random +from pathlib import Path + +from hera_librarian.authlevel import AuthLevel + + +def test_fix_missing_file( + test_server_with_many_files_and_errors, + test_orm, + mocked_admin_client, + server, + admin_client, + librarian_database_session_maker, + tmp_path, +): + # Create the accounts for the opposing servers + mocked_admin_client.create_user( + username="live_server", + password="password", + auth_level=AuthLevel.CALLBACK, + ) + + admin_client.create_user( + username="test_server", + password="password", + auth_level=AuthLevel.READAPPEND, + ) + + # Reigster the librarians with each other + assert mocked_admin_client.add_librarian( + name="live_server", + url="http://localhost", + authenticator="test_server:password", + port=server.id, + ) + + assert admin_client.add_librarian( + name="test_server", + url="http://localhost", + authenticator="live_server:password", # This is the default authenticator. + port=test_server_with_many_files_and_errors[2].id, + check_connection=False, + ) + + # Need to add a bunch of files to the source server + + file_cores = [f"repair_test_item_{x}.txt" for x in range(2)] + file_names = [f"repair_test/{file}" for file in file_cores] + + for file in file_cores: + with open(tmp_path / file, "w") as handle: + handle.write(str(random.randbytes(1024))) + + mocked_admin_client.upload(tmp_path / file, Path(f"repair_test/{file}")) + + # Mock up a couple of remote instances for these files on the destination, we must have these + # to use the repair workflow! + with test_server_with_many_files_and_errors[1]() as session: + for file_name in file_names: + file = ( + session.query(test_orm.File) + .filter(test_orm.File.name == file_name) + .one() + ) + librarian = ( + session.query(test_orm.Librarian) + .filter(test_orm.Librarian.name == "live_server") + .one() + ) + instance = test_orm.RemoteInstance.new_instance( + file=file, + store_id=1, + librarian=librarian, + ) + session.add(instance) + session.commit() + + # Now use the repair tasks to ask for copies of those files! + from hera_librarian.client import AdminClient + from hera_librarian.models.corrupt import ( + CorruptionPreparationRequest, + CorruptionPreparationResponse, + CorruptionResendRequest, + CorruptionResendResponse, + ) + + from ..conftest import create_test_client, make_mocked_admin_client + + # Ok, so we need to re-roll this process. + repair_request_client = make_mocked_admin_client( + create_test_client( + test_server_with_many_files_and_errors, + username="live_server", + password="password", + ), + username="live_server", + password="password", + ) + + for file_name in file_names: + assert repair_request_client.post( + "corrupt/prepare", + request=CorruptionPreparationRequest( + file_name=file_name, librarian_name="live_server" + ), + response=CorruptionPreparationResponse, + ).ready + + transfer_ids = [] + + for file_name in file_names: + resp = repair_request_client.post( + "corrupt/resend", + request=CorruptionResendRequest( + file_name=file_name, librarian_name="live_server" + ), + response=CorruptionResendResponse, + ) + + assert resp.success + transfer_ids.append(resp.destination_transfer_id) + + # Check in on those transfer IDs to see if they've been created on the downstream + with librarian_database_session_maker() as session: + for transfer_id in transfer_ids: + assert ( + session.query(test_orm.IncomingTransfer).filter_by(id=transfer_id).one() + ) + + # Now run the send queue and checkin tasks to get the files to the destination + from librarian_background.queues import CheckConsumedQueue, ConsumeQueue + from librarian_background.recieve_clone import RecieveClone + + consume_task = ConsumeQueue(name="consume_queue") + consume_task.core(session_maker=test_server_with_many_files_and_errors[1]) + + checkin_task = CheckConsumedQueue(name="checkin_queue") + checkin_task.core(session_maker=test_server_with_many_files_and_errors[1]) + + # Now we should have the files on the destination server and they need to be ingested + with librarian_database_session_maker() as session: + recv_task = RecieveClone(name="recieve_clone_job") + recv_task.core(session=session) + + for file_name in file_names: + assert session.query(test_orm.File).filter_by(name=file_name).one() + + # Delete it all. + with librarian_database_session_maker() as session: + for file_name in file_names: + file = session.query(test_orm.File).filter_by(name=file_name).one() + file.delete(session=session, commit=False, force=True) + + # Incoming and outgoing transfers + inc_transfers = ( + session.query(test_orm.IncomingTransfer) + .filter_by(upload_name=file_name) + .all() + ) + out_transfers = ( + session.query(test_orm.OutgoingTransfer) + .filter_by(file_name=file_name) + .all() + ) + + for transfer in inc_transfers: + session.delete(transfer) + + for transfer in out_transfers: + if transfer.send_queue: + session.delete(transfer.send_queue) + session.delete(transfer) + + session.commit() + + with test_server_with_many_files_and_errors[1]() as session: + for file_name in file_names: + file = session.query(test_orm.File).filter_by(name=file_name).one() + file.delete(session=session, commit=False, force=True) + + # Incoming and outgoing transfers + inc_transfers = ( + session.query(test_orm.IncomingTransfer) + .filter_by(upload_name=file_name) + .all() + ) + out_transfers = ( + session.query(test_orm.OutgoingTransfer) + .filter_by(file_name=file_name) + .all() + ) + + for transfer in inc_transfers: + session.delete(transfer) + + for transfer in out_transfers: + if transfer.send_queue: + session.delete(transfer.send_queue) + session.delete(transfer) + + session.commit() + + # Now delete accounts and librarians + mocked_admin_client.delete_user("live_server") + admin_client.delete_user("test_server") + + mocked_admin_client.remove_librarian("live_server") + admin_client.remove_librarian("test_server") diff --git a/tests/integration_test/test_send_queue.py b/tests/integration_test/test_send_queue.py index 8808a54..db93e15 100644 --- a/tests/integration_test/test_send_queue.py +++ b/tests/integration_test/test_send_queue.py @@ -294,12 +294,12 @@ def test_send_from_existing_file_row( # Force downstream to execute their background tasks. from librarian_background.recieve_clone import RecieveClone - task = RecieveClone( + recv_task = RecieveClone( name="recv_clone", ) with librarian_database_session_maker() as session: - task.core(session=session) + recv_task.core(session=session) # Now check the downstream librarian that it got all those files! with source_session_maker() as session: @@ -403,8 +403,37 @@ def test_send_from_existing_file_row( assert instance.available == False # Now see what happens when we corrupt a file and run the appropriate background tasks. - from librarian_background.check_integrity import CheckIntegrity - from librarian_background.corruption_fixer import CorruptionFixer + # from librarian_background.check_integrity import CheckIntegrity + # from librarian_background.corruption_fixer import CorruptionFixer + + # # Break a file + # with librarian_database_session_maker() as session: + # instance = session.query(test_orm.Instance).limit(10).all()[-1] + # chosen_instance_id = instance.id + # purposefully_broken_path = instance.path + + # with open(purposefully_broken_path, "w") as f: + # f.write("hahaha i broke ur file") + + # CheckIntegrity( + # name="check_integrity", + # store_name="local_store", + # age_in_days=10 + # ).core(session=session) + + # # Check that we got a corrupt file + # with librarian_database_session_maker() as session: + # broken_file = session.query(test_orm.CorruptFile).filter_by(instance_id=chosen_instance_id).one_or_none() + # broken_file_name = broken_file.file_name + # assert broken_file is not None + + # # Now run the fix flow. + # task = CorruptionFixer( + # name="fix_me" + # ) + # with librarian_database_session_maker() as session: + # task.core(session) + # recv_task.core(session) # Remove the librarians we added. assert mocked_admin_client.remove_librarian(name="live_server") diff --git a/tests/server.py b/tests/server.py index 59bc4de..550efdd 100644 --- a/tests/server.py +++ b/tests/server.py @@ -34,6 +34,8 @@ class Server(BaseModel): LIBRARIAN_BACKGROUND_CREATE_LOCAL_CLONE: str LIBRARIAN_BACKGROUND_SEND_CLONE: str LIBRARIAN_BACKGROUND_RECIEVE_CLONE: str + LIBRARIAN_BACKGROUND_CONSUME_QUEUE: str + LIBRARIAN_BACKGROUND_CHECK_CONSUMED_QUEUE: str process: str | None = None @property @@ -56,6 +58,8 @@ def env(self) -> dict[str, str]: "LIBRARIAN_BACKGROUND_CREATE_LOCAL_CLONE": self.LIBRARIAN_BACKGROUND_CREATE_LOCAL_CLONE, "LIBRARIAN_BACKGROUND_SEND_CLONE": self.LIBRARIAN_BACKGROUND_SEND_CLONE, "LIBRARIAN_BACKGROUND_RECIEVE_CLONE": self.LIBRARIAN_BACKGROUND_RECIEVE_CLONE, + "LIBRARIAN_BACKGROUND_CONSUME_QUEUE": self.LIBRARIAN_BACKGROUND_CONSUME_QUEUE, + "LIBRARIAN_BACKGROUND_CHECK_CONSUMED_QUEUE": self.LIBRARIAN_BACKGROUND_CHECK_CONSUMED_QUEUE, } @@ -215,6 +219,24 @@ def server_setup(tmp_path_factory, name="librarian_server") -> Server: ] ) + queue = json.dumps( + [ + { + "task_name": "queue", + "every": "00:01:00", + } + ] + ) + + check_queue = json.dumps( + [ + { + "task_name": "check_queue", + "every": "00:01:00", + } + ] + ) + return Server( id=server_id_and_port, base_path=tmp_path, @@ -223,7 +245,7 @@ def server_setup(tmp_path_factory, name="librarian_server") -> Server: database=database, LIBRARIAN_SERVER_NAME=name, LIBRARIAN_SERVER_DISPLAYED_SITE_NAME=name.replace("_", " ").title(), - LIBRARIAN_SERVER_ENCRYPTION_KEY=Fernet.generate_key().decode(), + LIBRARIAN_SERVER_ENCRYPTION_KEY=str(Fernet.generate_key().decode()), LIBRARIAN_SERVER_MAXIMAL_UPLOAD_SIZE_BYTES=1_000_000, # 1 MB for testing LIBRARIAN_CONFIG_PATH=librarian_config_path, LIBRARIAN_SERVER_DATABASE_DRIVER="sqlite", @@ -233,6 +255,8 @@ def server_setup(tmp_path_factory, name="librarian_server") -> Server: LIBRARIAN_BACKGROUND_CHECK_INTEGRITY=check_integrity, LIBRARIAN_BACKGROUND_CREATE_LOCAL_CLONE=create_local_clone, LIBRARIAN_BACKGROUND_RECIEVE_CLONE=recv_clone, + LIBRARIAN_BACKGROUND_CONSUME_QUEUE=queue, + LIBRARIAN_BACKGROUND_CHECK_CONSUMED_QUEUE=check_queue, LIBRARIAN_BACKGROUND_SEND_CLONE="[]", ) From 0849d77e6c92fc56996e0e098fec74af5c419f7b Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Tue, 17 Dec 2024 09:41:00 -0500 Subject: [PATCH 6/8] Add info on rollback --- librarian_background/corruption_fixer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/librarian_background/corruption_fixer.py b/librarian_background/corruption_fixer.py index 1aad8b3..e4145fd 100644 --- a/librarian_background/corruption_fixer.py +++ b/librarian_background/corruption_fixer.py @@ -115,7 +115,7 @@ def core(self, session: Session) -> bool: # Remedy B: the origin of this file is another librarian. Ask for a new copy. stmt = select(Librarian).filter_by(name=corrupt.file_source) - result = session.execute(stmt).scalar_one_or_none() + result: Librarian | None = session.execute(stmt).scalar_one_or_none() if result is None: logger.error( @@ -129,7 +129,6 @@ def core(self, session: Session) -> bool: continue # Use the librarian to ask for a new copy. - result: Librarian client = result.client() try: @@ -187,12 +186,13 @@ def core(self, session: Session) -> bool: except (LibrarianError, LibrarianHTTPError): logger.error( "Failed during the resend request flow for librarian {lib}, " - "corrupt {id} for file {name} with {e}", + "corrupt {id} for file {name} with {e}; we have deleted data and rows", lib=result.name, id=corrupt.id, name=corrupt.file_name, e=e, ) + # Can't rollback anything here so there's no point continue corrupt.incoming_transfer_id = resend_response.destination_transfer_id From dd08b5807f52b988377a1b814df967fc7c6196ed Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Tue, 17 Dec 2024 09:55:06 -0500 Subject: [PATCH 7/8] Respect soft timeout --- librarian_background/corruption_fixer.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/librarian_background/corruption_fixer.py b/librarian_background/corruption_fixer.py index e4145fd..148af20 100644 --- a/librarian_background/corruption_fixer.py +++ b/librarian_background/corruption_fixer.py @@ -2,6 +2,7 @@ A background task that queries the corrupt files table and remedies them. """ +from datetime import datetime, timedelta, timezone from time import perf_counter from loguru import logger @@ -36,6 +37,9 @@ def on_call(self): return self.core(session=session) def core(self, session: Session) -> bool: + start_time = datetime.now(timezone.utc) + end_time = start_time + self.soft_timeout + query_start = perf_counter() stmt = ( @@ -55,6 +59,13 @@ def core(self, session: Session) -> bool: ) for corrupt in results: + if datetime.now(timezone.utc) > end_time: + logger.warning( + "Soft timeout reached for CorruptionFixer; stopping at {time}", + time=datetime.now(timezone.utc), + ) + return False + logger.info( "Attempting to fix {id} ({name})", id=corrupt.id, name=corrupt.file_name ) From 81ab7697a8662c238753a8b40b106a8ceb24d939 Mon Sep 17 00:00:00 2001 From: Josh Borrow Date: Tue, 17 Dec 2024 10:01:06 -0500 Subject: [PATCH 8/8] Add corruption fixer settings and docs --- docs/source/Background.rst | 43 ++++++++++++++++++++++++++++++++ librarian_background/__init__.py | 1 + librarian_background/settings.py | 16 ++++++++++++ 3 files changed, 60 insertions(+) diff --git a/docs/source/Background.rst b/docs/source/Background.rst index e476299..6878179 100644 --- a/docs/source/Background.rst +++ b/docs/source/Background.rst @@ -122,6 +122,26 @@ The following background tasks are available: This task is configured with the following additional parameters: * ``age_in_days``: The number of days back to check for files to transfer (integer). +- ``duplicate_remote_instance_hypervisor``: A hypervisor that looks for duplicate remote + instance rows int he table and removes one. This ensures database integrity and that + the count of remote instances per librarian and store corresponds to the number of files + on that librarian. +- ``rolling_deletion``: A task to delete data that was ingested into the librarian + more than ``age_in_days`` ago. Note that this does not delete them from the entire network, + and has specific tools to ensure other copies exist in the network elsewhere: + + * ``store_name``: The store to delete instances from + * ``age_in_days``: The number of days old data needs to be to be considered for deletion + * ``number_of_remote_copies``: The number of copies in the rest of the network (which are + validated using checksumming) that must be kept before deleting a local instance. + * ``verifiy_downstream_checksums``: Whether to make sure all downstream checksums that were + computed on request match the underlying data before deletion (True). + * ``mark_unavailable``: Whether to mark instances as unavailable (True) or actually remove the + rows in the table (False). Default True. + * ``force_deletion``: Whether to ignore the legacy DeletionPolicy parameter (True). +- ``corruption_fixer``: A task that reaches out to upstream librarians to ask for new copies of + corrupt files in the table. These corrupt files can be found by the ``check_integrity`` task + or when upstreams validate files during the deletion process. Background Task Configuration Examples @@ -198,6 +218,22 @@ store. The destination librarian is called ``destination``. "every": "01:00:00", "age_in_days": 2 } + ], + "duplicate_remote_instance_hypervisor": [ + { + "task_name": "Duplicate RI hypervisor", + "soft_timeout": "00:30:00", + "every": "24:00:00" + } + ], + "rolling_deletion": [ + { + "task_name": "Storage Recovery", + "soft_timeout": "00:30:00", + "every": "24:00:00", + "store_name": "store", + "number_of_remote_copies": 2 + } ] } @@ -229,5 +265,12 @@ deleted from the store. "every": "01:00:00", "age_in_days": 2 } + ], + "corruption_fixer": [ + { + "task_name": "Corruption fixer", + "soft_timeout": "00:30:00", + "every": "24:00:00" + } ] } diff --git a/librarian_background/__init__.py b/librarian_background/__init__.py index 13ad94e..7c29d68 100644 --- a/librarian_background/__init__.py +++ b/librarian_background/__init__.py @@ -29,6 +29,7 @@ def background(run_once: bool = False): + background_settings.incoming_transfer_hypervisor + background_settings.duplicate_remote_instance_hypervisor + background_settings.rolling_deletion + + background_settings.corruption_fixer ) for task in all_tasks: diff --git a/librarian_background/settings.py b/librarian_background/settings.py index 3ee9e10..7a3b782 100644 --- a/librarian_background/settings.py +++ b/librarian_background/settings.py @@ -20,6 +20,7 @@ from librarian_background.rolling_deletion import RollingDeletion from .check_integrity import CheckIntegrity +from .corruption_fixer import CorruptionFixer from .create_clone import CreateLocalClone from .queues import CheckConsumedQueue, ConsumeQueue, TransferStatus from .recieve_clone import RecieveClone @@ -255,6 +256,19 @@ def task(self) -> RollingDeletion: ) +class CorruptionFixerSettings(BackgroundTaskSettings): + """ + Settings for the corruption fixer task. + """ + + @property + def task(self) -> CorruptionFixer: + return CorruptionFixer( + name=self.task_name, + soft_timeout=self.soft_timeout, + ) + + class BackgroundSettings(BaseSettings): """ Background task settings, configurable. @@ -288,6 +302,8 @@ class BackgroundSettings(BaseSettings): rolling_deletion: list[RollingDeletionSettings] = [] + corruption_fixer: list[CorruptionFixerSettings] = [] + # Global settings: max_rsync_retries: int = 8