Skip to content

Commit

Permalink
Merge branch 'main' into JBorrow/issue106
Browse files Browse the repository at this point in the history
  • Loading branch information
JBorrow authored Nov 14, 2024
2 parents b47b7d3 + 3f7fec0 commit d4ba877
Show file tree
Hide file tree
Showing 20 changed files with 564 additions and 671 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright 2017 the HERA Collaboration
# Licensed under the 2-clause BSD License.


"""Add librarian transfer toggling and corruption
Revision ID: 1def8c988372
Expand Down Expand Up @@ -40,3 +41,5 @@ def upgrade():
def downgrade():
op.drop_column("librarians", "transfers_enabled")
op.drop_table("corrupt_files")


5 changes: 3 additions & 2 deletions hera_librarian/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1749,12 +1749,13 @@ def config_set_librarian_transfer_subparser(sub_parsers):
sp.add_argument(
"--name", help="The name of the librarian to set the transfer state of."
)
sp.add_argument(
grp = sp.add_mutually_exclusive_group()
grp.add_argument(
"--enabled",
action="store_true",
help="Set the librarian to enabled for transfers.",
)
sp.add_argument(
grp.add_argument(
"--disabled",
action="store_true",
help="Set the librarian to disabled for transfers.",
Expand Down
67 changes: 38 additions & 29 deletions librarian_background/check_integrity.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@
"""

import datetime
import logging
import time

from loguru import logger
from schedule import CancelJob
from sqlalchemy.orm import Session

from hera_librarian.utils import compare_checksums, get_hash_function_from_hash
from librarian_server.database import get_session
from librarian_server.logger import ErrorCategory, ErrorSeverity, log_to_database
from librarian_server.orm import Instance, StoreMetadata
from librarian_server.orm.file import CorruptFile, File

from .task import Task

logger = logging.getLogger("schedule")


class CheckIntegrity(Task):
"""
Expand Down Expand Up @@ -48,26 +46,35 @@ def core(self, session: Session):
Frame this out with the session so that it is automatically closed.
"""
try:
logger.info(
"Checking integrity of store {}, age_in_days={}",
self.store_name,
self.age_in_days,
)
store = self.get_store(session=session)
except ValueError:
# Store doesn't exist. Cancel this job.
log_to_database(
severity=ErrorSeverity.CRITICAL,
category=ErrorCategory.CONFIGURATION,
message=f"Store {self.store_name} does not exist. Cancelling job. Please update the configuration.",
session=session,
logger.error(
"Store {} does not exist, cancelling job: please update configuration",
self.store_name,
)
return CancelJob

# Now figure out what files were uploaded in the past age_in_days days.
start_time = datetime.datetime.now() - datetime.timedelta(days=self.age_in_days)

# Now we can query the database for all files that were uploaded in the past age_in_days days.
query_start = time.perf_counter()
files = (
session.query(Instance)
.filter(Instance.store == store and Instance.created_time > start_time)
.all()
)
query_end = time.perf_counter()
logger.info(
"Queried database for instances created since {} in {} seconds",
start_time,
query_end - query_start,
)

all_files_fine = True

Expand All @@ -80,21 +87,23 @@ def core(self, session: Session):
)
except FileNotFoundError:
all_files_fine = False
log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_AVAILABILITY,
message=f"File {file.path} on store {store.name} is missing. (Instance: {file.id})",
session=session,
logger.error(
"Instance {} on store {} is missing. (Instance: {})",
file.path,
store.name,
file.id,
)
continue

# Compare checksum to database
expected_checksum = file.file.checksum

if compare_checksums(expected_checksum, path_info.checksum):
# File is fine.
logger.info(
f"File {file.path} on store {store.name} has been validated."
"Instance {} on store {} has been validated (Instance: {})",
file.path,
store.name,
file.id,
)
continue
else:
Expand All @@ -120,24 +129,24 @@ def core(self, session: Session):
corrupt_file.count += 1
session.commit()

log_to_database(
severity=ErrorSeverity.ERROR,
category=ErrorCategory.DATA_INTEGRITY,
message=(
f"File {file.path} on store {store.name} has an incorrect checksum. "
f"Expected {expected_checksum}, got {path_info.checksum}. "
f"See CorruptFile {corrupt_file.id} (Instance: {file.id})"
),
session=session,
logger.error(
"Instance {} on store {} has an incorrect checksum. Expected {}, got {}. (Instance: {})",
file.path,
store.name,
expected_checksum,
path_info.checksum,
file.id,
)

if all_files_fine:
logger.info(
f"All files uploaded since {start_time} on store {store.name} have been validated."
"All files uploaded since {} on store {} have been validated.",
start_time,
store.name,
)
else:
logger.error(
f"Some files uploaded since {start_time} on store {store.name} have not been validated. Please check the logs."
"Store {} has files with incorrect checksums.",
store.name,
)

return all_files_fine
Loading

0 comments on commit d4ba877

Please sign in to comment.