diff --git a/lib/pbench/cli/server/tree_manage.py b/lib/pbench/cli/server/tree_manage.py index e19d41ad68..e70fb93b97 100644 --- a/lib/pbench/cli/server/tree_manage.py +++ b/lib/pbench/cli/server/tree_manage.py @@ -1,14 +1,111 @@ +from datetime import datetime, timedelta, timezone +import errno +from logging import Logger + import click from pbench.cli import pass_cli_context from pbench.cli.server import config_setup from pbench.cli.server.options import common_options from pbench.common.logger import get_pbench_logger -from pbench.server import BadConfig -from pbench.server.cache_manager import CacheManager +from pbench.server import BadConfig, OperationCode +from pbench.server.cache_manager import CacheManager, LockManager +from pbench.server.database.models.audit import Audit, AuditStatus, AuditType + +# Length of time in hours to retain unreferenced cached results data. +# TODO: this could become a configurable setting? +CACHE_LIFETIME = 4.0 + + +def reclaim_cache(tree: CacheManager, logger: Logger, lifetime: float = CACHE_LIFETIME): + """Reclaim unused caches + + Args: + tree: the cache manager instance + lifetime: number of hours to retain unused cache data + logger: a Logger object + """ + window = datetime.now(timezone.utc) - timedelta(hours=lifetime) + total_count = 0 + has_cache = 0 + reclaimed = 0 + reclaim_failed = 0 + for tarball in tree.datasets.values(): + total_count += 1 + if tarball.unpacked: + has_cache += 1 + date = datetime.fromtimestamp( + tarball.last_ref.stat().st_mtime, timezone.utc + ) + if date >= window: + continue + error = None + audit = None + logger.info( + "RECLAIM {}: last_ref {:%Y-%m-%d %H:%M:%S} is older than {:%Y-%m-%d %H:%M:%S}", + tarball.name, + date, + window, + ) + try: + with LockManager(tarball.lock, exclusive=True, wait=False): + try: + audit = Audit.create( + name="reclaim", + operation=OperationCode.DELETE, + status=AuditStatus.BEGIN, + user_name=Audit.BACKGROUND_USER, + object_type=AuditType.DATASET, + object_id=tarball.resource_id, + object_name=tarball.name, + ) + except Exception as e: + logger.warn( + "Unable to audit cache reclaim for {}: '{}'", + tarball.name, + e, + ) + tarball.cache_delete() + reclaimed += 1 + except OSError as e: + if e.errno in (errno.EAGAIN, errno.EACCES): + logger.info( + "RECLAIM {}: skipping because cache is locked", + tarball.name, + ) + # If the cache is locked, regardless of age, then + # the last_ref timestamp is about to be updated, + # and we skip the dataset this time around. + continue + error = e + except Exception as e: + error = e + attributes = {"last_ref": f"{date:%Y-%m-%d %H:%M:%S}"} + if error: + reclaim_failed += 1 + logger.error("RECLAIM {} failed with '{}'", tarball.name, error) + attributes["error"] = str(error) + if audit: + Audit.create( + root=audit, + status=AuditStatus.FAILURE if error else AuditStatus.SUCCESS, + attributes=attributes, + ) + logger.info( + "RECLAIM summary: {} datasets, {} had cache: {} reclaimed and {} errors", + total_count, + has_cache, + reclaimed, + reclaim_failed, + ) def print_tree(tree: CacheManager): + """Print basic information about the cache + + Args: + tree: a cache instance + """ print(f"Tree anchored at {tree.archive_root}\n") if len(tree.datasets) == 0 and len(tree.controllers) == 0: @@ -18,14 +115,17 @@ def print_tree(tree: CacheManager): print("Tarballs:") for tarball in tree.datasets.values(): print(f" {tarball.name}") + if tarball.unpacked: + date = datetime.fromtimestamp( + tarball.last_ref.stat().st_mtime, timezone.utc + ) + print(f" Inventory is cached, last referenced {date:%Y-%m-%d %H:%M:%S}") print("\nControllers:") for controller in tree.controllers.values(): print(f" Controller {controller.name}:") for tarball in controller.tarballs.values(): print(f" Tarball {tarball.name}") - if tarball.unpacked: - print(f" Unpacked in {tarball.unpacked}") @click.command(name="pbench-tree-manager") @@ -33,19 +133,30 @@ def print_tree(tree: CacheManager): @click.option( "--display", default=False, is_flag=True, help="Display the full tree on completion" ) +@click.option( + "--reclaim", + show_default=True, + is_flag=False, + flag_value=CACHE_LIFETIME, + type=click.FLOAT, + help="Reclaim cached data older than hours", +) @common_options -def tree_manage(context: object, display: bool): +def tree_manage(context: object, display: bool, reclaim: float): """ Discover, display, and manipulate the on-disk representation of controllers and datasets. - This primarily exposes the CacheManager object hierarchy, and provides a simple - hierarchical display of controllers and datasets. + This primarily exposes the CacheManager object hierarchy, and provides a + hierarchical display of controllers and datasets. This also supports + reclaiming cached dataset files that haven't been referenced recently. \f Args: context: Click context (contains shared `--config` value) display: Print a simplified representation of the hierarchy + lifetime: Number of hours to retain unused cache before reclaim + reclaim: Reclaim stale cached data """ try: config = config_setup(context) @@ -54,6 +165,8 @@ def tree_manage(context: object, display: bool): cache_m.full_discovery() if display: print_tree(cache_m) + if reclaim: + reclaim_cache(cache_m, logger, reclaim) rv = 0 except Exception as exc: logger.exception("An error occurred discovering the file tree: {}", exc) diff --git a/lib/pbench/server/api/resources/datasets_inventory.py b/lib/pbench/server/api/resources/datasets_inventory.py index a006cae726..260ad6375a 100644 --- a/lib/pbench/server/api/resources/datasets_inventory.py +++ b/lib/pbench/server/api/resources/datasets_inventory.py @@ -19,7 +19,12 @@ ParamType, Schema, ) -from pbench.server.cache_manager import CacheManager, CacheType, TarballNotFound +from pbench.server.cache_manager import ( + CacheExtractBadPath, + CacheManager, + CacheType, + TarballNotFound, +) class DatasetsInventory(ApiBase): @@ -63,7 +68,7 @@ def _get( cache_m = CacheManager(self.config, current_app.logger) try: file_info = cache_m.get_inventory(dataset.resource_id, target) - except TarballNotFound as e: + except (TarballNotFound, CacheExtractBadPath) as e: raise APIAbort(HTTPStatus.NOT_FOUND, str(e)) if file_info["type"] != CacheType.FILE: diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index 17ad65bb51..20d4fd2d02 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -1,6 +1,7 @@ from collections import deque from dataclasses import dataclass from enum import auto, Enum +import fcntl from logging import Logger from pathlib import Path import shlex @@ -10,7 +11,8 @@ from typing import Any, IO, Optional, Union from pbench.common import MetadataLog, selinux -from pbench.server import JSONOBJECT, PathLike, PbenchServerConfig +from pbench.server import JSONOBJECT, OperationCode, PathLike, PbenchServerConfig +from pbench.server.database.models.audit import Audit, AuditStatus, AuditType from pbench.server.database.models.datasets import Dataset from pbench.server.utils import get_tarball_md5 @@ -18,93 +20,80 @@ class CacheManagerError(Exception): """Base class for exceptions raised from this module.""" - def __str__(self) -> str: - return "Generic cache manager exception" + pass class BadDirpath(CacheManagerError): """A bad directory path was given.""" def __init__(self, error_msg: str): + super().__init__(error_msg) self.error_msg = error_msg - def __str__(self) -> str: - return self.error_msg - class BadFilename(CacheManagerError): """A bad tarball path was given.""" def __init__(self, path: PathLike): + super().__init__(f"The file path {path} is not a tarball") self.path = str(path) - def __str__(self) -> str: - return f"The file path {self.path!r} is not a tarball" - class CacheExtractBadPath(CacheManagerError): """Request to extract a path that's bad or not a file""" def __init__(self, tar_name: Path, path: PathLike): + super().__init__(f"Unable to extract {path} from {tar_name.name}") self.name = tar_name.name self.path = str(path) - def __str__(self) -> str: - return f"Unable to extract {self.path} from {self.name}" - class TarballNotFound(CacheManagerError): """The dataset was not found in the ARCHIVE tree.""" def __init__(self, tarball: str): + super().__init__(f"The dataset tarball named {tarball!r} is not found") self.tarball = tarball - def __str__(self) -> str: - return f"The dataset tarball named {self.tarball!r} is not found" - class DuplicateTarball(CacheManagerError): """A duplicate tarball name was detected.""" def __init__(self, tarball: str): + super().__init__(f"A dataset tarball named {tarball!r} is already present") self.tarball = tarball - def __str__(self) -> str: - return f"A dataset tarball named {self.tarball!r} is already present" - class MetadataError(CacheManagerError): """A problem was found processing a tarball's metadata.log file.""" def __init__(self, tarball: Path, error: Exception): + super().__init__( + f"A problem occurred processing metadata.log from {tarball}: {str(error)!r}" + ) self.tarball = tarball self.error = str(error) - def __str__(self) -> str: - return f"A problem occurred processing metadata.log from {self.tarball!s}: {self.error!r}" - class TarballUnpackError(CacheManagerError): """An error occurred trying to unpack a tarball.""" def __init__(self, tarball: Path, error: str): + super().__init__(f"An error occurred while unpacking {tarball}: {error}") self.tarball = tarball self.error = error - def __str__(self) -> str: - return f"An error occurred while unpacking {self.tarball}: {self.error}" - class TarballModeChangeError(CacheManagerError): """An error occurred trying to fix unpacked tarball permissions.""" def __init__(self, tarball: Path, error: str): + super().__init__( + f"An error occurred while changing file permissions of {tarball}: {error}" + ) self.tarball = tarball self.error = error - def __str__(self) -> str: - return f"An error occurred while changing file permissions of {self.tarball}: {self.error}" - class CacheType(Enum): FILE = auto() @@ -192,15 +181,137 @@ def make_cache_object(dir_path: Path, path: Path) -> CacheObject: ) +class LockRef: + """Keep track of a cache lock passed off to a caller""" + + def __init__(self, lock: Path): + """Initialize a lock reference + + The lock file is opened in "w+" mode, which is "write update": unlike + "r+", this creates the file if it doesn't already exist, but still + allows lock conversion between LOCK_EX and LOCK_SH. + + Args: + lock: the path of a lock file + """ + self.lock = lock.open("w+") + self.locked = False + self.exclusive = False + + def acquire(self, exclusive: bool = False, wait: bool = True) -> "LockRef": + """Acquire the lock + + Args: + exclusive: lock for exclusive access + wait: wait for lock + + Raises: + OSError (EAGAIN or EACCES): wait=False and the lock is already + owned + + Returns: + The lockref, so this can be chained with the constructor + """ + + self.exclusive = exclusive + cmd = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH + if not wait: + cmd |= fcntl.LOCK_NB + fcntl.lockf(self.lock, cmd) + self.locked = True + return self + + def release(self): + """Release the lock and close the lock file""" + + if not self.locked: + return + try: + fcntl.lockf(self.lock, fcntl.LOCK_UN) + self.lock.close() + finally: + # Release our reference to the lock file so that the + # object can be reclaimed. + self.locked = False + self.exclusive = False + self.lock = None + + def upgrade(self): + """Upgrade a shared lock to exclusive""" + if not self.exclusive: + fcntl.lockf(self.lock, fcntl.LOCK_EX) + self.exclusive = True + + def downgrade(self): + """Downgrade an exclusive lock to shared""" + if self.exclusive: + fcntl.lockf(self.lock, fcntl.LOCK_SH) + self.exclusive = False + + +class LockManager: + def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): + self.lock = LockRef(lock) + self.exclusive = exclusive + self.wait = wait + self.unlock = True + + def keep(self) -> "LockManager": + """Tell the context manager not to unlock on exit""" + self.unlock = False + return self + + def release(self): + """Release manually if necessary""" + self.lock.release() + + def upgrade(self): + """Upgrade a shared lock to exclusive""" + self.lock.upgrade() + self.exclusive = True + + def downgrade(self): + """Downgrade an exclusive lock to shared""" + self.lock.downgrade() + self.exclusive = False + + def __enter__(self) -> "LockManager": + """Enter a lock context manager by acquiring the lock + + Raises: + OSError: self.wait is False, and the lock is already owned. + + Returns: + the LockManager object + """ + self.lock.acquire(exclusive=self.exclusive, wait=self.wait) + return self + + def __exit__(self, *exc): + """Exit a lock context manager by releasing the lock + + This does nothing if "unlock" has been cleared, meaning we want the + lock to persist beyond the context manager scope. + """ + if self.unlock: + self.lock.release() + + class Inventory: - """Encapsulate the file stream and subprocess.Popen object + """Encapsulate the file stream and cache lock management This encapsulation allows cleaner downstream handling, so that we can close - both the extracted file stream and the Popen object itself when we're done. - This eliminates interference with later operations. + both the extracted file stream and unlock the dataset cache after the file + reference is complete. In APIs the Inventory close is done by the Flask + infrastructure after the response handling is done. """ - def __init__(self, stream: IO[bytes], subproc: Optional[subprocess.Popen] = None): + def __init__( + self, + stream: IO[bytes], + lock: Optional[LockRef] = None, + subproc: Optional[subprocess.Popen] = None, + ): """Construct an instance to track extracted inventory This encapsulates many byte stream operations so that it can be used @@ -208,13 +319,17 @@ def __init__(self, stream: IO[bytes], subproc: Optional[subprocess.Popen] = None Args: stream: the data stream of a specific tarball member - subproc: the subprocess producing the stream, if any + lock: a cache lock reference + subproc: a Popen object to clean up on close """ - self.subproc = subproc self.stream = stream + self.lock = lock + self.subproc = subproc def close(self): """Close the byte stream and clean up the Popen object""" + + exception = None if self.subproc: try: if self.subproc.poll() is None: @@ -232,17 +347,25 @@ def close(self): while self.subproc.stderr.read(4096): pass self.subproc.wait(60.0) - finally: + except Exception as e: # Release our reference to the subprocess.Popen object so that the # object can be reclaimed. self.subproc = None - - # Explicitly close the stream, in case there never was an associated - # subprocess. (If there was an associated subprocess, the streams are - # now empty, and we'll assume that they are closed when the Popen - # object is reclaimed.) + exception = e + if self.lock: + try: + self.lock.release() + except Exception as e: + exception = e self.stream.close() + # NOTE: if both subprocess cleanup and unlock fail with exceptions, we + # raise the latter, and the former will be ignored. In practice, that's + # not a problem as we only construct an Inventory with a subprocess + # reference for extract, which doesn't need to lock a cache directory. + if exception: + raise exception + def getbuffer(self): """Return the underlying byte buffer (used by send_file)""" return self.stream.getbuffer() @@ -261,7 +384,7 @@ def seek(self, *args, **kwargs) -> int: def __repr__(self) -> str: """Return a string representation""" - return f"" + return f"" def __iter__(self): """Allow iterating through lines in the buffer""" @@ -325,6 +448,9 @@ def __init__(self, path: Path, resource_id: str, controller: "Controller"): # Record where cached unpacked data would live self.cache: Path = controller.cache / self.resource_id + # We need the directory to lock, so make sure it's there + self.cache.mkdir(parents=True, exist_ok=True) + # Record hierarchy of a Tar ball self.cachemap: Optional[CacheMap] = None @@ -333,6 +459,16 @@ def __init__(self, path: Path, resource_id: str, controller: "Controller"): # inactive. self.unpacked: Optional[Path] = None + # Record the lockf file path used to control cache access + self.lock: Path = self.cache / "lock" + + # Record a marker file path used to record the last cache access + # timestamp + self.last_ref: Path = self.cache / "last_ref" + + # Record the path of the companion MD5 file + self.md5_path: Path = path.with_suffix(".xz.md5") + # Record the name of the containing controller self.controller_name: str = controller.name @@ -468,8 +604,10 @@ def create( return cls(destination, resource_id, controller) def cache_map(self, dir_path: Path): - """Builds Hierarchy structure of a Directory in a Dictionary - Format. + """Build hierarchical representation of results tree + + NOTE: this structure isn't removed when we release the cache, as the + data remains valid. Args: dir_path: root directory @@ -497,8 +635,7 @@ def cache_map(self, dir_path: Path): @staticmethod def traverse_cmap(path: Path, cachemap: CacheMap) -> CacheMapEntry: - """Sequentially traverses the cachemap to find the leaf of a - relative path reference + """Locate a path in the cache map Args: path: relative path of the subdirectory/file @@ -531,11 +668,10 @@ def traverse_cmap(path: Path, cachemap: CacheMap) -> CacheMapEntry: def get_info(self, path: Path) -> JSONOBJECT: """Returns the details of the given file/directory in dict format - NOTE: This requires a call to the cache_map method to build a map that - can be traversed. Currently, this is done only on unpack, and isn't - useful except within the `pbench-index` process. This map needs to - either be built dynamically (potentially expensive) or persisted in - SQL or perhaps Redis. + NOTE: If the cache manager doesn't already have a cache map for the + current Tarball, we'll unpack it here; however as the cache map isn't + dependent on the unpacked results tree, we immediately release the + cache lock. Args: path: path of the file/subdirectory @@ -564,6 +700,10 @@ def get_info(self, path: Path) -> JSONOBJECT: " we expect relative path to the root directory." ) + if not self.cachemap: + with LockManager(self.lock) as lock: + self.get_results(lock) + c_map = self.traverse_cmap(path, self.cachemap) children = c_map["children"] if "children" in c_map else {} fd_info = c_map["details"].__dict__.copy() @@ -587,6 +727,11 @@ def get_info(self, path: Path) -> JSONOBJECT: def extract(tarball_path: Path, path: str) -> Inventory: """Returns a file stream for a file within a tarball + NOTE: This should generally be used only within the INTAKE path before + it's practical to load the cache with a fully unpacked tree. The + extracted file is not cached and therefore each reference will repeat + the tar file extraction. + Args: tarball_path: absolute path of the tarball path: relative path within the tarball @@ -657,7 +802,7 @@ def extract(tarball_path: Path, path: str) -> Inventory: # containing the extracted file to our caller, and return the Popen # object so that we can clean it up when the Inventory object is closed. if not tarproc.returncode: - return Inventory(tarproc.stdout, tarproc) + return Inventory(tarproc.stdout, subproc=tarproc) # The tar command was invoked successfully (otherwise, the Popen() # constructor would have raised an exception), but it exited with @@ -675,6 +820,33 @@ def extract(tarball_path: Path, path: str) -> Inventory: tarball_path, f"Unexpected error from {tar_path}: {error_text!r}" ) + def stream(self, path: str) -> Inventory: + """Return a cached inventory file as a binary stream + + Args: + path: Relative path of a regular file within a tarball. + + On failure, an exception is raised and the cache lock is released; on + success, returns an Inventory object which implicitly transfers + ownership and management of the cache lock to the caller. When done + with the inventory's file stream, the caller must close the Inventory + object to release the file stream and the cache lock. + + Raises: + CacheExtractBadPath: the path does not match a regular file within + the tarball. + + Returns: + An Inventory object encapsulating the file stream and the cache + lock. + """ + + with LockManager(self.lock) as lock: + artifact: Path = self.get_results(lock) / path + if not artifact.is_file(): + raise CacheExtractBadPath(self.tarball_path, path) + return Inventory(artifact.open("rb"), lock=lock.keep()) + def get_inventory(self, path: str) -> Optional[JSONOBJECT]: """Access the file stream of a tarball member file. @@ -688,12 +860,11 @@ def get_inventory(self, path: str) -> Optional[JSONOBJECT]: info = { "name": self.tarball_path.name, "type": CacheType.FILE, - "stream": Inventory(self.tarball_path.open("rb"), None), + "stream": Inventory(self.tarball_path.open("rb")), } else: - file_path = f"{self.name}/{path}" - stream = Tarball.extract(self.tarball_path, file_path) - info = {"name": file_path, "type": CacheType.FILE, "stream": stream} + stream = self.stream(path) + info = {"name": path, "type": CacheType.FILE, "stream": stream} return info @@ -757,53 +928,84 @@ def subprocess_run( f"{cmd[0]} exited with status {process.returncode}: {process.stderr.strip()!r}", ) - def unpack(self): + def get_results(self, lock: LockManager) -> Path: """Unpack a tarball into a temporary directory tree - Unpack the tarball into a temporary cache directory named with the - tarball's resource_id (MD5). - - This tree will be used for indexing, and then discarded. As we build - out more of the cache manager, it can also be used to build our initial - cache map. - - The indexer works off the unpacked data under CACHE, assuming the - tarball name in all paths (because this is what's inside the tarball). - Rather than passing the indexer the root `/srv/pbench/.cache` or trying - to update all of the indexer code (which still jumps back and forth - between the tarball and the unpacked files), we maintain the "cache" - directory as two paths: self.cache which is the directory we manage - here and pass to the indexer (/srv/pbench/.cache/) and - the actual unpacked root (/srv/pbench/.cache//). + Make sure that the dataset results are unpacked into a cache tree. The + location of the unpacked tree is in self.unpacked and is also returned + direct to the caller. + + Args: + lock: A lock context manager in shared lock state + + Returns: + the root Path of the unpacked directory tree """ - self.cache.mkdir(parents=True) - try: - tar_command = "tar -x --no-same-owner --delay-directory-restore " - tar_command += f"--force-local --file='{str(self.tarball_path)}'" - self.subprocess_run( - tar_command, self.cache, TarballUnpackError, self.tarball_path - ) + if not self.unpacked: + lock.upgrade() + audit = None + error = None + try: + audit = Audit.create( + name="cache", + operation=OperationCode.CREATE, + status=AuditStatus.BEGIN, + user_name=Audit.BACKGROUND_USER, + object_type=AuditType.DATASET, + object_id=self.resource_id, + object_name=self.name, + ) + except Exception as e: + self.controller.logger.warning( + "Unable to audit unpack for {}: '{}'", self.name, e + ) - find_command = "find . ( -type d -exec chmod ugo+rx {} + ) -o ( -type f -exec chmod ugo+r {} + )" - self.subprocess_run( - find_command, self.cache, TarballModeChangeError, self.cache - ) - except Exception: - shutil.rmtree(self.cache, ignore_errors=True) - raise - self.unpacked = self.cache / self.name - self.cache_map(self.unpacked) + try: + tar_command = "tar -x --no-same-owner --delay-directory-restore " + tar_command += f"--force-local --file='{str(self.tarball_path)}'" + self.subprocess_run( + tar_command, self.cache, TarballUnpackError, self.tarball_path + ) + find_command = "find . ( -type d -exec chmod ugo+rx {} + ) -o ( -type f -exec chmod ugo+r {} + )" + self.subprocess_run( + find_command, self.cache, TarballModeChangeError, self.cache + ) + self.unpacked = self.cache / self.name + self.cache_map(self.unpacked) + except Exception as e: + error = str(e) + raise + finally: + if audit: + attributes = {"error": error} if error else {} + Audit.create( + root=audit, + status=AuditStatus.FAILURE if error else AuditStatus.SUCCESS, + attributes=attributes, + ) + lock.downgrade() + self.last_ref.touch(exist_ok=True) + return self.unpacked + + def cache_delete(self): + """Remove the unpacked tarball directory and all contents. + + WARNING: - def uncache(self): - """Remove the unpacked tarball directory and all contents.""" + This is unprotected! + + Normal cache reclaim is managed using the `tree_manage --reclaim` CLI + command, generally through the `pbench-reclaim.timer` service, which + calls this method only when the cache is unlocked and aged out. + """ self.cachemap = None if self.unpacked: try: - shutil.rmtree(self.cache) + shutil.rmtree(self.unpacked) self.unpacked = None except Exception as e: - self.logger.error("incoming remove for {} failed with {}", self.name, e) + self.logger.error("cache reclaim for {} failed with {}", self.name, e) raise def delete(self): @@ -812,7 +1014,7 @@ def delete(self): We'll log errors in deletion, but "succeed" and clear the links to both files. There's nothing more we can do. """ - self.uncache() + self.cache_delete() if self.isolator and self.isolator.exists(): try: shutil.rmtree(self.isolator) @@ -943,24 +1145,6 @@ def create_tarball(self, tarfile_path: Path) -> Tarball: self.tarballs[tarball.name] = tarball return tarball - def unpack(self, dataset_id: str): - """Unpack a tarball into a temporary cache directory. - - Args: - dataset_id: Resource ID of the dataset to unpack - """ - tarball = self.datasets[dataset_id] - tarball.unpack() - - def uncache(self, dataset_id: str): - """Remove the cached unpack directory. - - Args: - dataset_id: Resource ID of dataset to remove - """ - tarball = self.datasets[dataset_id] - tarball.uncache() - def delete(self, dataset_id: str): """Delete a dataset tarball and remove it from the controller @@ -1247,19 +1431,6 @@ def create(self, tarfile_path: Path) -> Tarball: self.datasets[tarball.resource_id] = tarball return tarball - def unpack(self, dataset_id: str) -> Tarball: - """Unpack a tarball into the CACHE tree - - Args: - dataset_id: Dataset resource ID - - Returns: - The tarball object - """ - tarball = self.find_dataset(dataset_id) - tarball.controller.unpack(dataset_id) - return tarball - def get_info(self, dataset_id: str, path: Path) -> dict[str, Any]: """Get information about dataset files from the cache map @@ -1293,16 +1464,6 @@ def get_inventory(self, dataset_id: str, target: str) -> Optional[JSONOBJECT]: tarball = self.find_dataset(dataset_id) return tarball.get_inventory(target) - def uncache(self, dataset_id: str): - """Remove the unpacked tarball tree. - - Args: - dataset_id: Dataset resource ID to "uncache" - """ - tarball = self.find_dataset(dataset_id) - controller = tarball.controller - controller.uncache(dataset_id) - def delete(self, dataset_id: str): """Delete the dataset as well as unpacked artifacts. diff --git a/lib/pbench/server/indexing_tarballs.py b/lib/pbench/server/indexing_tarballs.py index 7d749b15ef..9f33d1c104 100644 --- a/lib/pbench/server/indexing_tarballs.py +++ b/lib/pbench/server/indexing_tarballs.py @@ -15,7 +15,7 @@ UnsupportedTarballFormat, ) from pbench.server import OperationCode, tstos -from pbench.server.cache_manager import CacheManager, Tarball, TarballNotFound +from pbench.server.cache_manager import CacheManager, LockManager, Tarball from pbench.server.database.models.audit import Audit, AuditStatus from pbench.server.database.models.datasets import ( Dataset, @@ -364,65 +364,64 @@ def sighup_handler(*args): tarobj: Optional[Tarball] = None tb_res = error_code["OK"] try: - # Dynamically unpack the tarball for indexing. + # We need the fully unpacked cache tree to index it try: - tarobj = self.cache_manager.unpack(dataset.resource_id) - if not tarobj.unpacked: - idxctx.logger.warning( - "{} has not been unpacked", dataset - ) - continue - except TarballNotFound as e: + tarobj = self.cache_manager.find_dataset( + dataset.resource_id + ) + except Exception as e: self.sync.error( dataset, - f"Unable to unpack dataset: {e!s}", + f"Unable to find dataset: {e!s}", ) continue - audit = Audit.create( - operation=OperationCode.UPDATE, - name="index", - status=AuditStatus.BEGIN, - user_name=Audit.BACKGROUND_USER, - dataset=dataset, - ) + with LockManager(tarobj.lock) as lock: + tarobj.get_results(lock) + audit = Audit.create( + operation=OperationCode.UPDATE, + name="index", + status=AuditStatus.BEGIN, + user_name=Audit.BACKGROUND_USER, + dataset=dataset, + ) - # "Open" the tar ball represented by the tar ball object - idxctx.logger.debug("open tar ball") - ptb = PbenchTarBall(idxctx, dataset, tmpdir, tarobj) - - # Construct the generator for emitting all actions. - # The `idxctx` dictionary is passed along to each - # generator so that it can add its context for - # error handling to the list. - idxctx.logger.debug("generator setup") - if self.options.index_tool_data: - actions = ptb.mk_tool_data_actions() - else: - actions = ptb.make_all_actions() - - # Create a file where the pyesbulk package will - # record all indexing errors that can't/won't be - # retried. - with ie_filepath.open(mode="w") as fp: - idxctx.logger.debug("begin indexing") - try: - signal.signal(signal.SIGINT, sigint_handler) - es_res = es_index( - idxctx.es, - actions, - fp, - idxctx.logger, - idxctx._dbg, - ) - except SigIntException: - idxctx.logger.exception( - "Indexing interrupted by SIGINT, continuing to next tarball" - ) - continue - finally: - # Turn off the SIGINT handler when not indexing. - signal.signal(signal.SIGINT, signal.SIG_IGN) + # "Open" the tar ball represented by the tar ball object + idxctx.logger.debug("open tar ball") + ptb = PbenchTarBall(idxctx, dataset, tmpdir, tarobj) + + # Construct the generator for emitting all actions. + # The `idxctx` dictionary is passed along to each + # generator so that it can add its context for + # error handling to the list. + idxctx.logger.debug("generator setup") + if self.options.index_tool_data: + actions = ptb.mk_tool_data_actions() + else: + actions = ptb.make_all_actions() + + # Create a file where the pyesbulk package will + # record all indexing errors that can't/won't be + # retried. + with ie_filepath.open(mode="w") as fp: + idxctx.logger.debug("begin indexing") + try: + signal.signal(signal.SIGINT, sigint_handler) + es_res = es_index( + idxctx.es, + actions, + fp, + idxctx.logger, + idxctx._dbg, + ) + except SigIntException: + idxctx.logger.exception( + "Indexing interrupted by SIGINT, continuing to next tarball" + ) + continue + finally: + # Turn off the SIGINT handler when not indexing. + signal.signal(signal.SIGINT, signal.SIG_IGN) except UnsupportedTarballFormat as e: tb_res = self.emit_error( idxctx.logger.warning, "TB_META_ABSENT", e @@ -464,9 +463,6 @@ def sighup_handler(*args): ) tb_res = error_code["OP_ERROR" if failures > 0 else "OK"] finally: - # Remove the unpacked data - if tarobj: - tarobj.uncache() if tb_res.success: try: diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index f432b6118d..df3e31a45a 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -1,4 +1,5 @@ import errno +import fcntl import hashlib import io from logging import Logger @@ -11,7 +12,7 @@ import pytest -from pbench.server import JSONOBJECT +from pbench.server import JSONOBJECT, OperationCode from pbench.server.cache_manager import ( BadDirpath, BadFilename, @@ -21,12 +22,15 @@ Controller, DuplicateTarball, Inventory, + LockManager, + LockRef, MetadataError, Tarball, TarballModeChangeError, TarballNotFound, TarballUnpackError, ) +from pbench.server.database.models.audit import Audit, AuditStatus, AuditType from pbench.server.database.models.datasets import Dataset, DatasetBadName from pbench.test.unit.server.conftest import make_tarball @@ -82,6 +86,30 @@ def fake_get_metadata(_tb_path): ) +class FakeLockRef: + operations = [] + + def __init__(self, lockpath: Path): + self.lock = lockpath + + def acquire(self, exclusive: bool = False, wait: bool = True): + self.operations.append(("acquire", exclusive, wait)) + return self + + def release(self): + self.operations.append(("release")) + + def upgrade(self): + self.operations.append(("upgrade")) + + def downgrade(self): + self.operations.append(("downgrade")) + + @classmethod + def reset(cls): + cls.operations.clear() + + class TestCacheManager: def test_create(self, server_config, make_logger): """ @@ -471,11 +499,15 @@ def generate_test_result_tree(tmp_path: Path, dir_name: str) -> Path: # create some directories and files inside the temp directory sub_dir = tmp_path / dir_name sub_dir.mkdir(parents=True, exist_ok=True) - (sub_dir / "f1.json").touch() - (sub_dir / "metadata.log").touch() + (sub_dir / "f1.json").write_text("{'json': 'value'}", encoding="utf-8") + (sub_dir / "metadata.log").write_text( + "[pbench]\nkey = value\n", encoding="utf-8" + ) for i in range(1, 4): (sub_dir / "subdir1" / f"subdir1{i}").mkdir(parents=True, exist_ok=True) - (sub_dir / "subdir1" / "f11.txt").touch() + (sub_dir / "subdir1" / "f11.txt").write_text( + "textual\nlines\n", encoding="utf-8" + ) (sub_dir / "subdir1" / "subdir14" / "subdir141").mkdir( parents=True, exist_ok=True ) @@ -504,47 +536,47 @@ def generate_test_result_tree(tmp_path: Path, dir_name: str) -> Path: class MockTarball: def __init__(self, path: Path, resource_id: str, controller: Controller): self.name = Dataset.stem(path) + self.resource_id = "ABC" self.tarball_path = path - self.cache = controller.cache / "ABC" + self.cache = controller.cache / self.resource_id self.isolator = controller.path / resource_id + self.lock = self.cache / "lock" + self.last_ref = self.cache / "last_ref" self.unpacked = None + self.controller = controller - def test_unpack_tar_subprocess_exception(self, make_logger, monkeypatch): + def test_unpack_tar_subprocess_exception( + self, make_logger, db_session, monkeypatch + ): """Show that, when unpacking of the Tarball fails and raises an Exception it is handled successfully.""" tar = Path("/mock/A.tar.xz") cache = Path("/mock/.cache") - rmtree_called = False - - def mock_rmtree(path: Path, ignore_errors=False): - nonlocal rmtree_called - rmtree_called = True - - assert ignore_errors - assert path == cache / "ABC" + @staticmethod def mock_run(command, _dir_path, exception, dir_p): verb = "tar" assert command.startswith(verb) raise exception(dir_p, subprocess.TimeoutExpired(verb, 43)) with monkeypatch.context() as m: - m.setattr(Path, "mkdir", lambda path, parents: None) - m.setattr(Tarball, "subprocess_run", staticmethod(mock_run)) - m.setattr(shutil, "rmtree", mock_rmtree) + m.setattr(Path, "exists", lambda self: True) + m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) + m.setattr(Path, "touch", lambda path, exist_ok=False: None) + m.setattr(Tarball, "subprocess_run", mock_run) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) tb = Tarball( tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) ) - with pytest.raises(TarballUnpackError) as exc: - tb.unpack() msg = f"An error occurred while unpacking {tar}: Command 'tar' timed out after 43 seconds" - assert str(exc.value) == msg - assert exc.type == TarballUnpackError - assert rmtree_called + with pytest.raises(TarballUnpackError, match=msg): + tb.get_results(FakeLockRef(tb.lock)) + FakeLockRef.reset() - def test_unpack_find_subprocess_exception(self, make_logger, monkeypatch): + def test_unpack_find_subprocess_exception( + self, make_logger, db_session, monkeypatch + ): """Show that, when permission change of the Tarball fails and raises an Exception it is handled successfully.""" tar = Path("/mock/A.tar.xz") @@ -558,6 +590,7 @@ def mock_rmtree(path: Path, ignore_errors=False): assert ignore_errors assert path == cache / "ABC" + @staticmethod def mock_run(command, _dir_path, exception, dir_p): verb = "find" if command.startswith(verb): @@ -566,8 +599,10 @@ def mock_run(command, _dir_path, exception, dir_p): assert command.startswith("tar") with monkeypatch.context() as m: - m.setattr(Path, "mkdir", lambda path, parents: None) - m.setattr(Tarball, "subprocess_run", staticmethod(mock_run)) + m.setattr(Path, "exists", lambda self: True) + m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) + m.setattr(Path, "touch", lambda path, exist_ok=False: None) + m.setattr(Tarball, "subprocess_run", mock_run) m.setattr(shutil, "rmtree", mock_rmtree) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) @@ -575,15 +610,15 @@ def mock_run(command, _dir_path, exception, dir_p): tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) ) - with pytest.raises(TarballModeChangeError) as exc: - tb.unpack() msg = "An error occurred while changing file permissions of " msg += f"{cache / 'ABC'}: Command 'find' timed out after 43 seconds" + with pytest.raises(TarballModeChangeError, match=msg) as exc: + tb.get_results(FakeLockRef(tb.lock)) assert str(exc.value) == msg - assert exc.type == TarballModeChangeError assert rmtree_called + FakeLockRef.reset() - def test_unpack_success(self, make_logger, monkeypatch): + def test_unpack_success(self, make_logger, db_session, monkeypatch): """Test to check the unpacking functionality of the CacheManager""" tar = Path("/mock/A.tar.xz") cache = Path("/mock/.cache") @@ -604,17 +639,19 @@ def mock_resolve(_path, _strict=False): raise AssertionError("Unexpected call to Path.resolve()") with monkeypatch.context() as m: - m.setattr(Path, "mkdir", lambda path, parents: None) - m.setattr(subprocess, "run", mock_run) + m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) + m.setattr(Path, "touch", lambda path, exist_ok=False: None) + m.setattr("pbench.server.cache_manager.subprocess.run", mock_run) m.setattr(Path, "resolve", mock_resolve) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) tb = Tarball( tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) ) - tb.unpack() + tb.get_results(FakeLockRef(tb.lock)) assert call == ["tar", "find"] assert tb.unpacked == cache / "ABC" / tb.name + FakeLockRef.reset() def test_cache_map_success(self, make_logger, monkeypatch, tmp_path): """Test to build the cache map of the root directory""" @@ -708,7 +745,7 @@ def test_cache_map_bad_dir_path( "f1.json", None, None, - 0, + 17, CacheType.FILE, ), ( @@ -726,7 +763,7 @@ def test_cache_map_bad_dir_path( "f11.txt", None, None, - 0, + 14, CacheType.FILE, ), ( @@ -735,7 +772,7 @@ def test_cache_map_bad_dir_path( "f11.txt", None, None, - 0, + 14, CacheType.FILE, ), ( @@ -744,7 +781,7 @@ def test_cache_map_bad_dir_path( "f11.txt", None, None, - 0, + 14, CacheType.FILE, ), ( @@ -903,7 +940,7 @@ def test_cache_map_traverse_cmap( "name": "f11.txt", "resolve_path": None, "resolve_type": None, - "size": 0, + "size": 14, "type": CacheType.FILE, }, ), @@ -969,36 +1006,57 @@ def test_cache_map_get_info_cmap( assert file_info == expected_msg @pytest.mark.parametrize( - "file_path, exp_file_type, exp_stream", + "file_path,is_unpacked,exp_stream", [ - ("", CacheType.FILE, io.BytesIO(b"tarball_as_a_byte_stream")), - (None, CacheType.FILE, io.BytesIO(b"tarball_as_a_byte_stream")), - ("f1.json", CacheType.FILE, io.BytesIO(b"file_as_a_byte_stream")), - ("subdir1/f12_sym", CacheType.FILE, io.BytesIO(b"file1_as_a_byte_stream")), + ("", True, b"tarball_as_a_byte_stream"), + (None, True, b"tarball_as_a_byte_stream"), + ("f1.json", True, b"{'json': 'value'}"), + ("subdir1/f12_sym", False, CacheExtractBadPath(Path("a"), "b")), ], ) def test_get_inventory( - self, make_logger, monkeypatch, tmp_path, file_path, exp_file_type, exp_stream + self, make_logger, monkeypatch, tmp_path, file_path, is_unpacked, exp_stream ): - """Test to extract file contents/stream from a file""" - tar = Path("/mock/dir_name.tar.xz") - cache = Path("/mock/.cache") + """Test to extract file contents/stream from a file + + NOTE: we check explicitly whether Tarball.stream() chooses to unpack + the tarball, based on "is_unpacked", to be sure both cases are covered; + but be aware that accessing the tarball itself (the "" and None + file_path cases) don't ever unpack and should always have is_unpacked + set to True. (That is, we don't expect get_results to be called in + those cases.) + """ + archive = tmp_path / "mock" / "archive" / "ABC" + archive.mkdir(parents=True, exist_ok=True) + tar = archive / "dir_name.tar.xz" + tar.write_bytes(b"tarball_as_a_byte_stream") + cache = tmp_path / "mock" / ".cache" + unpacked = cache / "ABC" / "dir_name" + + def fake_results(self, lock: LockRef) -> Path: + self.unpacked = unpacked + return self.unpacked with monkeypatch.context() as m: m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) - m.setattr(Tarball, "extract", lambda _t, _p: Inventory(exp_stream)) - m.setattr(Path, "open", lambda _s, _m="rb": exp_stream) - tb = Tarball( - tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) - ) - tar_dir = TestCacheManager.MockController.generate_test_result_tree( - tmp_path, "dir_name" + tb = Tarball(tar, "ABC", Controller(archive, cache, make_logger)) + m.setattr(Tarball, "get_results", fake_results) + if is_unpacked: + tb.unpacked = unpacked + TestCacheManager.MockController.generate_test_result_tree( + cache / "ABC", "dir_name" ) - tb.cache_map(tar_dir) - file_info = tb.get_inventory(file_path) - assert file_info["type"] == exp_file_type - assert file_info["stream"].stream == exp_stream + try: + file_info = tb.get_inventory(file_path) + except Exception as e: + assert isinstance(e, type(exp_stream)), e + else: + assert not isinstance(exp_stream, Exception) + assert file_info["type"] is CacheType.FILE + stream: Inventory = file_info["stream"] + assert stream.stream.read() == exp_stream + stream.close() def test_cm_inventory(self, monkeypatch, server_config, make_logger): """Verify the happy path of the high level get_inventory""" @@ -1221,8 +1279,9 @@ def seek(self, offset: int, _whence: int = io.SEEK_SET) -> int: return offset # Invoke the CUT - stream = Inventory(MockBufferedReader(), None) + stream = Inventory(MockBufferedReader()) + assert stream.lock is None assert stream.subproc is None # Test Inventory.getbuffer() @@ -1333,7 +1392,7 @@ def seek(self, offset: int, _whence: int = io.SEEK_SET) -> int: ), # The subprocess is still running when close() is called, stdout is # empty, there is no stderr, and the wait times out. - (None, 0, None, True, ["poll", "kill", "stdout", "wait"]), + (None, 0, None, True, ["poll", "kill", "stdout", "wait", "close"]), ), ) def test_inventory( @@ -1418,7 +1477,7 @@ def read(self, size: int = -1) -> bytes: assert my_stream, "Test bug: we need at least one of stdout and stderr" # Invoke the CUT - stream = Inventory(my_stream, MockPopen(my_stdout, my_stderr)) + stream = Inventory(my_stream, subproc=MockPopen(my_stdout, my_stderr)) # Test Inventory.__repr__() assert str(stream) == " from MockPopen>" @@ -1430,11 +1489,17 @@ def read(self, size: int = -1) -> bytes: else: assert not wait_timeout, "wait() failed to time out as expected" - assert stream.subproc is None + assert stream.lock is None assert my_calls == exp_calls def test_find( - self, selinux_enabled, server_config, make_logger, tarball, monkeypatch + self, + selinux_enabled, + server_config, + make_logger, + tarball, + monkeypatch, + db_session, ): """ Create a dataset, check the cache manager state, and test that we can find it @@ -1483,9 +1548,26 @@ def test_find( assert exc.value.tarball == "foobar" # Unpack the dataset, creating INCOMING and RESULTS links - cm.unpack(md5) + tarball.get_results(FakeLockRef(tarball.lock)) assert tarball.cache == controller.cache / md5 assert tarball.unpacked == controller.cache / md5 / tarball.name + assert tarball.last_ref.exists() + FakeLockRef.reset() + + audits = Audit.query(name="cache") + assert len(audits) == 2 + assert audits[0].name == "cache" + assert audits[0].operation is OperationCode.CREATE + assert audits[0].status is AuditStatus.BEGIN + assert audits[0].object_type is AuditType.DATASET + assert audits[0].object_id == tarball.resource_id + assert audits[0].object_name == tarball.name + assert audits[1].name == "cache" + assert audits[1].operation is OperationCode.CREATE + assert audits[1].status is AuditStatus.SUCCESS + assert audits[1].object_type is AuditType.DATASET + assert audits[1].object_id == tarball.resource_id + assert audits[1].object_name == tarball.name # We should be able to find the tarball even in a new cache manager # that hasn't been fully discovered. @@ -1517,11 +1599,13 @@ def test_lifecycle( cm = CacheManager(server_config, make_logger) archive = cm.archive_root / "ABC" cache = cm.cache_root / md5 + dataset_name = Dataset.stem(source_tarball) + unpack = cache / dataset_name monkeypatch.setattr(Tarball, "_get_metadata", fake_get_metadata) # None of the controller directories should exist yet assert not archive.exists() - assert not cache.exists() + assert not unpack.exists() # Create a dataset in the cache manager from our source tarball cm.create(source_tarball) @@ -1529,7 +1613,7 @@ def test_lifecycle( # Expect the archive directory was created, but we haven't unpacked so # incoming and results should not exist. assert archive.exists() - assert not cache.exists() + assert not unpack.exists() # The original files should have been removed assert not source_tarball.exists() @@ -1546,18 +1630,20 @@ def test_lifecycle( assert md5 == md5_hash.hexdigest() assert list(cm.controllers.keys()) == ["ABC"] - dataset_name = source_tarball.name[:-7] assert list(cm.tarballs) == [dataset_name] assert list(cm.datasets) == [md5] + dataset = cm.find_dataset(md5) + # Now "unpack" the tarball and check that the incoming directory and # results link are set up. - cm.unpack(md5) - assert cache == cm[md5].cache + dataset.get_results(FakeLockRef(dataset.lock)) + assert cache == dataset.cache assert cache.is_dir() - assert (cache / dataset_name).is_dir() + assert unpack.is_dir() + FakeLockRef.reset() - assert cm.datasets[md5].unpacked == cache / dataset_name + assert dataset.unpacked == unpack # Re-discover, with all the files in place, and compare newcm = CacheManager(server_config, make_logger) @@ -1585,10 +1671,9 @@ def test_lifecycle( assert tarball.cache == other.cache assert tarball.unpacked == other.unpacked - # Remove the unpacked tarball, and confirm that the directory and link - # are removed. - cm.uncache(md5) - assert not cache.exists() + # Remove the unpacked tarball, and confirm the directory is removed + dataset.cache_delete() + assert not unpack.exists() # Now that we have all that setup, delete the dataset cm.delete(md5) @@ -1625,8 +1710,12 @@ def test_compatibility( t2 = cm1[id] assert t1.name == t2.name == Dataset.stem(source_tarball) - t1.unpack() - t2.unpack() + r1 = t1.get_results(FakeLockRef(t1.lock)) + assert r1 == t1.unpacked + FakeLockRef.reset() + r2 = t2.get_results(FakeLockRef(t2.lock)) + assert r2 == t2.unpacked + FakeLockRef.reset() assert t1.unpacked != t2.unpacked assert (t1.unpacked / "metadata.log").is_file() @@ -1645,3 +1734,217 @@ def test_compatibility( assert not tar1.exists() assert not tar2.exists() assert not tar1.parent.exists() + + def test_lockref(self, monkeypatch): + """Test behavior of the LockRef class""" + + locks: list[tuple] = [] + files: list[tuple] = [] + close_fail: Optional[Exception] = None + lockf_fail: Optional[Exception] = None + + def reset(): + nonlocal close_fail, lockf_fail + close_fail = None + lockf_fail = None + locks.clear() + files.clear() + + class FakeStream: + def __init__(self, file): + self.file = file + + def close(self): + files.append(("close", self.file)) + if close_fail: + raise close_fail + + def fileno(self) -> int: + return 0 + + def __eq__(self, other) -> bool: + return self.file == other.file + + def __str__(self) -> str: + return f"Fake Stream with file {self.file!r}" + + def fake_lockf(fd: FakeStream, cmd: int): + locks.append((fd.file, cmd)) + if lockf_fail: + raise lockf_fail + + def fake_open(self, flags: str) -> FakeStream: + files.append(("open", self, flags)) + return FakeStream(self) + + monkeypatch.setattr("pbench.server.cache_manager.fcntl.lockf", fake_lockf) + monkeypatch.setattr(Path, "open", fake_open) + + lockfile = Path("/lock") + + # Simple shared lock + lock = LockRef(lockfile) + assert lock.lock == FakeStream(lockfile) + assert not lock.locked + assert not lock.exclusive + assert len(locks) == 0 + assert files == [("open", lockfile, "w+")] + lock.acquire() + assert lock.locked + assert locks == [(lockfile, fcntl.LOCK_SH)] + lock.release() + assert not lock.locked + assert files == [("open", lockfile, "w+"), ("close", lockfile)] + assert locks == [(lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_UN)] + + # No-wait + reset() + lock = LockRef(lockfile) + assert files == [("open", lockfile, "w+")] + lock.acquire(wait=False) + assert not lock.exclusive + assert locks == [(lockfile, fcntl.LOCK_SH | fcntl.LOCK_NB)] + lock.release() + assert locks == [ + (lockfile, fcntl.LOCK_SH | fcntl.LOCK_NB), + (lockfile, fcntl.LOCK_UN), + ] + assert files == [("open", lockfile, "w+"), ("close", lockfile)] + + # No-wait failure + reset() + lock = LockRef(lockfile) + assert files == [("open", lockfile, "w+")] + lockf_fail = OSError(errno.EAGAIN, "Loser") + with pytest.raises(OSError, match="Loser"): + lock.acquire(wait=False) + assert not lock.exclusive + assert not lock.locked + assert locks == [(lockfile, fcntl.LOCK_SH | fcntl.LOCK_NB)] + + # Exclusive + reset() + lock = LockRef(lockfile) + assert files == [("open", lockfile, "w+")] + lock.acquire(exclusive=True) + assert lock.exclusive + assert locks == [(lockfile, fcntl.LOCK_EX)] + assert lock.locked + assert lock.exclusive + # upgrade is idempotent + lock.upgrade() + assert locks == [(lockfile, fcntl.LOCK_EX)] + assert lock.exclusive + assert lock.locked + lock.downgrade() + assert locks == [(lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_SH)] + assert not lock.exclusive + assert lock.locked + # downgrade is idempotent + lock.downgrade() + assert locks == [(lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_SH)] + assert not lock.exclusive + assert lock.locked + # upgrade back to exclusive + lock.upgrade() + assert locks == [ + (lockfile, fcntl.LOCK_EX), + (lockfile, fcntl.LOCK_SH), + (lockfile, fcntl.LOCK_EX), + ] + assert lock.exclusive + assert lock.locked + # release + lock.release() + assert locks == [ + (lockfile, fcntl.LOCK_EX), + (lockfile, fcntl.LOCK_SH), + (lockfile, fcntl.LOCK_EX), + (lockfile, fcntl.LOCK_UN), + ] + assert files == [("open", lockfile, "w+"), ("close", lockfile)] + assert not lock.locked + assert not lock.exclusive + + # Check release exception handling on close + reset() + lock = LockRef(lockfile).acquire() + assert locks == [(lockfile, fcntl.LOCK_SH)] + assert files == [("open", lockfile, "w+")] + + message = "Nobody inspects the Spanish Aquisition" + close_fail = Exception(message) + with pytest.raises(Exception, match=message): + lock.release() + assert lock.lock is None + assert files == [("open", lockfile, "w+"), ("close", lockfile)] + assert locks == [(lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_UN)] + + # Check release exception handling on unlock + reset() + lock = LockRef(lockfile).acquire() + assert locks == [(lockfile, fcntl.LOCK_SH)] + assert files == [("open", lockfile, "w+")] + + message = "Nobody inspects the Spanish Aquisition" + lockf_fail = Exception(message) + with pytest.raises(Exception, match=message): + lock.release() + assert lock.lock is None + assert files == [("open", lockfile, "w+")] + assert locks == [(lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_UN)] + + def test_lockmanager(self, monkeypatch): + """Test LockManager""" + + lockfile = Path("/lock") + monkeypatch.setattr("pbench.server.cache_manager.LockRef", FakeLockRef) + # default parameters + with LockManager(lockfile) as lock: + assert lock.unlock + assert not lock.exclusive + assert lock.wait + assert FakeLockRef.operations == [("acquire", False, True)] + lock.upgrade() + assert FakeLockRef.operations == [("acquire", False, True), ("upgrade")] + lock.downgrade() + assert FakeLockRef.operations == [ + ("acquire", False, True), + ("upgrade"), + ("downgrade"), + ] + assert FakeLockRef.operations == [ + ("acquire", False, True), + ("upgrade"), + ("downgrade"), + ("release"), + ] + + # exclusive + FakeLockRef.reset() + with LockManager(lockfile, exclusive=True) as lock: + assert lock.unlock + assert lock.exclusive + assert lock.wait + assert FakeLockRef.operations == [("acquire", True, True)] + assert FakeLockRef.operations == [("acquire", True, True), ("release")] + + # no-wait + FakeLockRef.reset() + with LockManager(lockfile, wait=False) as lock: + assert lock.unlock + assert not lock.exclusive + assert not lock.wait + assert FakeLockRef.operations == [("acquire", False, False)] + assert FakeLockRef.operations == [("acquire", False, False), ("release")] + + # keep option + FakeLockRef.reset() + with LockManager(lockfile) as lock: + still_locked = lock.keep() + assert still_locked is lock + assert not lock.unlock + assert FakeLockRef.operations == [("acquire", False, True)] + assert FakeLockRef.operations == [("acquire", False, True)] + still_locked.release() + assert FakeLockRef.operations == [("acquire", False, True), ("release")] diff --git a/lib/pbench/test/unit/server/test_datasets_inventory.py b/lib/pbench/test/unit/server/test_datasets_inventory.py index 100d9e0e1c..305ea8634c 100644 --- a/lib/pbench/test/unit/server/test_datasets_inventory.py +++ b/lib/pbench/test/unit/server/test_datasets_inventory.py @@ -102,7 +102,7 @@ def mock_get_inventory(_s, _d: str, _t: str): return { "name": "f1.json", "type": CacheType.FILE, - "stream": Inventory(exp_stream, None), + "stream": Inventory(exp_stream), } response = Response() @@ -142,7 +142,7 @@ def mock_get_inventory(_s, _d: str, _t: str): return { "name": "f1.json", "type": CacheType.FILE, - "stream": Inventory(exp_stream, None), + "stream": Inventory(exp_stream), } def mock_send_file(path_or_file, *args, **kwargs): @@ -164,7 +164,7 @@ def mock_get_inventory(_s, _d: str, _t: str): return { "name": "f1.json", "type": CacheType.FILE, - "stream": Inventory(exp_stream, None), + "stream": Inventory(exp_stream), } monkeypatch.setattr(CacheManager, "get_inventory", mock_get_inventory) diff --git a/lib/pbench/test/unit/server/test_datasets_visualize.py b/lib/pbench/test/unit/server/test_datasets_visualize.py index 5547010360..ba525e13f1 100644 --- a/lib/pbench/test/unit/server/test_datasets_visualize.py +++ b/lib/pbench/test/unit/server/test_datasets_visualize.py @@ -53,7 +53,7 @@ def mock_get_inventory(self, _dataset: str, target: str): return { "name": Path(target).name, "type": CacheType.FILE, - "stream": Inventory(BytesIO(b"CSV_file_as_a_byte_stream"), None), + "stream": Inventory(BytesIO(b"CSV_file_as_a_byte_stream")), } @staticmethod diff --git a/lib/pbench/test/unit/server/test_indexing_tarballs.py b/lib/pbench/test/unit/server/test_indexing_tarballs.py index 5ee642591e..3eccfc6b7e 100644 --- a/lib/pbench/test/unit/server/test_indexing_tarballs.py +++ b/lib/pbench/test/unit/server/test_indexing_tarballs.py @@ -16,6 +16,7 @@ OperationCode, PbenchServerConfig, ) +from pbench.server.cache_manager import LockManager from pbench.server.database.models.audit import AuditStatus from pbench.server.database.models.datasets import ( Dataset, @@ -265,6 +266,41 @@ def error(self, dataset: Dataset, message: str): __class__.errors[dataset.name] = message +class FakeLockManager: + def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): + """Initialize a mocked lock reference + + Args: + lock: the path of a lock file + exclusive: lock exclusively + wait: wait for lock + """ + self.exclusive = exclusive + self.wait = wait + + def __enter__(self) -> "FakeLockManager": + """Acquire the lock + + Returns: + self reference for 'as' clause + """ + return self + + def __exit__(self, *exc): + """Release the lock and close the lock file""" + self.exclusive = False + + def upgrade(self): + """Upgrade a shared lock to exclusive""" + if not self.exclusive: + self.exclusive = True + + def downgrade(self): + """Downgrade an exclusive lock to shared""" + if self.exclusive: + self.exclusive = False + + class FakeController: def __init__(self, path: Path, cache: Path, logger: Logger): self.name = path.name @@ -279,9 +315,13 @@ def __init__(self, path: Path, resource_id: str, controller: FakeController): self.tarball_path = path self.controller = controller self.cache = controller.cache / "ABC" + self.lock = self.cache / "lock" + self.last_ref = self.cache / "last_ref" self.unpacked = self.cache / self.name self.isolator = controller.path / resource_id - self.uncache = lambda: None + + def get_results(self, lock: LockManager): + pass class FakeCacheManager: @@ -292,7 +332,7 @@ def __init__(self, config: PbenchServerConfig, logger: Logger): self.logger = logger self.datasets = {} - def unpack(self, resource_id: str): + def find_dataset(self, resource_id: str): controller = FakeController(Path("/archive/ABC"), Path("/.cache"), self.logger) return FakeTarball( Path(f"/archive/ABC/{resource_id}/{self.lookup[resource_id]}.tar.xz"), @@ -341,6 +381,7 @@ def mocks(monkeypatch, make_logger): m.setattr("pbench.server.indexing_tarballs.Metadata", FakeMetadata) m.setattr("pbench.server.indexing_tarballs.IndexMap", FakeIndexMap) m.setattr("pbench.server.indexing_tarballs.CacheManager", FakeCacheManager) + m.setattr("pbench.server.indexing_tarballs.LockManager", FakeLockManager) m.setattr("pbench.server.indexing_tarballs.Audit", FakeAudit) yield m FakeAudit.reset() diff --git a/server/lib/systemd/pbench-reclaim.service b/server/lib/systemd/pbench-reclaim.service new file mode 100644 index 0000000000..bc9b430d18 --- /dev/null +++ b/server/lib/systemd/pbench-reclaim.service @@ -0,0 +1,14 @@ +[Unit] +Description=Reclaim Pbench Server cache +Wants=pbench-reclaim.timer + +[Service] +Type = simple +User = pbench +Group = pbench +Environment = _PBENCH_SERVER_CONFIG=/opt/pbench-server/lib/config/pbench-server.cfg +ExecStart=-/opt/pbench-server/bin/pbench-tree-manage --reclaim +KillSignal = TERM + +[Install] +WantedBy=pbench-server.service diff --git a/server/lib/systemd/pbench-reclaim.timer b/server/lib/systemd/pbench-reclaim.timer new file mode 100644 index 0000000000..74480864cd --- /dev/null +++ b/server/lib/systemd/pbench-reclaim.timer @@ -0,0 +1,12 @@ +[Unit] +Description=Pbench Server cache reclaim timer +After=pbench-server.service +Requires=pbench-reclaim.service +BindsTo=pbench-server.service + +[Timer] +Unit=pbench-reclaim.service +OnUnitInactiveSec=4h + +[Install] +WantedBy=timers.target diff --git a/server/pbenchinacan/container-build.sh b/server/pbenchinacan/container-build.sh index e5ea3878e6..eebbd1e51b 100755 --- a/server/pbenchinacan/container-build.sh +++ b/server/pbenchinacan/container-build.sh @@ -79,12 +79,15 @@ buildah run $container cp \ ${SERVER_LIB}/systemd/pbench-server.service \ ${SERVER_LIB}/systemd/pbench-index.service \ ${SERVER_LIB}/systemd/pbench-index.timer \ + ${SERVER_LIB}/systemd/pbench-reclaim.service \ + ${SERVER_LIB}/systemd/pbench-reclaim.timer \ /etc/systemd/system/ buildah run $container systemctl enable nginx buildah run $container systemctl enable rsyslog buildah run $container systemctl enable pbench-server buildah run $container systemctl enable pbench-index.timer +buildah run $container systemctl enable pbench-reclaim.timer # Create the container image. buildah commit --rm $container ${PB_CONTAINER_REG}/${PB_SERVER_IMAGE_NAME}:${PB_SERVER_IMAGE_TAG}