From 4ff7a924829d5c1817ec890b059c7b908985e900 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Tue, 12 Sep 2023 20:25:43 -0400 Subject: [PATCH] Encapsulate and package locking Allow holding lock from unpack to stream, and conversion between `EX` and `SH` lock modes. --- lib/pbench/cli/server/tree_manage.py | 92 ++++--- lib/pbench/server/cache_manager.py | 247 ++++++++++-------- lib/pbench/server/indexing_tarballs.py | 23 +- .../test/unit/server/test_cache_manager.py | 116 +++----- .../unit/server/test_indexing_tarballs.py | 56 +++- 5 files changed, 288 insertions(+), 246 deletions(-) diff --git a/lib/pbench/cli/server/tree_manage.py b/lib/pbench/cli/server/tree_manage.py index 229154a53b..eef74f66ee 100644 --- a/lib/pbench/cli/server/tree_manage.py +++ b/lib/pbench/cli/server/tree_manage.py @@ -1,6 +1,5 @@ from datetime import datetime, timedelta, timezone import errno -import fcntl from logging import Logger import click @@ -10,7 +9,7 @@ from pbench.cli.server.options import common_options from pbench.common.logger import get_pbench_logger from pbench.server import BadConfig, OperationCode -from pbench.server.cache_manager import CacheManager +from pbench.server.cache_manager import CacheManager, LockRef from pbench.server.database.models.audit import Audit, AuditStatus, AuditType # Length of time in hours to retain unreferenced cached results data. @@ -38,18 +37,19 @@ def reclaim_cache(tree: CacheManager, logger: Logger, lifetime: float = CACHE_LI date = datetime.fromtimestamp( tarball.last_ref.stat().st_mtime, timezone.utc ) - if date < window: - logger.info( - "RECLAIM {}: last_ref {:%Y-%m-%d %H:%M:%S} is older than {:%Y-%m-%d %H:%M:%S}", - tarball.name, - date, - window, - ) - error = None - audit = None - with tarball.lock.open("wb") as lock: + if date >= window: + continue + error = None + audit = None + logger.info( + "RECLAIM {}: last_ref {:%Y-%m-%d %H:%M:%S} is older than {:%Y-%m-%d %H:%M:%S}", + tarball.name, + date, + window, + ) + try: + with LockRef(tarball.lock, exclusive=True, wait=False): try: - fcntl.lockf(lock, fcntl.LOCK_EX | fcntl.LOCK_NB) audit = Audit.create( name="reclaim", operation=OperationCode.DELETE, @@ -59,29 +59,36 @@ def reclaim_cache(tree: CacheManager, logger: Logger, lifetime: float = CACHE_LI object_id=tarball.resource_id, object_name=tarball.name, ) - try: - tarball.cache_delete() - reclaimed += 1 - except Exception as e: - error = e - finally: - fcntl.lockf(lock, fcntl.LOCK_UN) - except OSError as e: - if e.errno in (errno.EAGAIN, errno.EACCES): - logger.info( - "RECLAIM {}: skipping because cache is locked", - tarball.name, - ) - # If the cache is locked, regardless of age, then - # the last_ref timestamp is about to be updated, - # and we skip the cache for now. - continue + except Exception as e: + logger.warn( + "Unable to audit cache reclaim for {}: '{}'", + tarball.name, + e, + ) + try: + tarball.cache_delete() + reclaimed += 1 + except Exception as e: error = e - attributes = {"last_ref": f"{date:%Y-%m-%d %H:%M:%S}"} - if error: - reclaim_failed += 1 - logger.error("RECLAIM {} failed with '{}'", tarball.name, error) - attributes["error"] = str(error) + except OSError as e: + if e.errno in (errno.EAGAIN, errno.EACCES): + logger.info( + "RECLAIM {}: skipping because cache is locked", + tarball.name, + ) + # If the cache is locked, regardless of age, then + # the last_ref timestamp is about to be updated, + # and we skip the dataset this time around. + continue + error = e + except Exception as e: + error = e + attributes = {"last_ref": f"{date:%Y-%m-%d %H:%M:%S}"} + if error: + reclaim_failed += 1 + logger.error("RECLAIM {} failed with '{}'", tarball.name, error) + attributes["error"] = str(error) + if audit: Audit.create( root=audit, status=AuditStatus.FAILURE if error else AuditStatus.SUCCESS, @@ -130,16 +137,15 @@ def print_tree(tree: CacheManager): "--display", default=False, is_flag=True, help="Display the full tree on completion" ) @click.option( - "--lifetime", - default=CACHE_LIFETIME, + "--reclaim", + show_default=True, + is_flag=False, + flag_value=CACHE_LIFETIME, type=click.FLOAT, - help="Specify lifetime of cached data for --reclaim", -) -@click.option( - "--reclaim", default=False, is_flag=True, help="Reclaim stale cached data" + help="Reclaim cached data older than hours", ) @common_options -def tree_manage(context: object, display: bool, lifetime: int, reclaim: bool): +def tree_manage(context: object, display: bool, reclaim: float): """ Discover, display, and manipulate the on-disk representation of controllers and datasets. @@ -163,7 +169,7 @@ def tree_manage(context: object, display: bool, lifetime: int, reclaim: bool): if display: print_tree(cache_m) if reclaim: - reclaim_cache(cache_m, logger, lifetime) + reclaim_cache(cache_m, logger, reclaim) rv = 0 except Exception as exc: logger.exception("An error occurred discovering the file tree: {}", exc) diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index 3348735e11..dc2cfdc2ef 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -20,93 +20,80 @@ class CacheManagerError(Exception): """Base class for exceptions raised from this module.""" - def __str__(self) -> str: - return "Generic cache manager exception" + pass class BadDirpath(CacheManagerError): """A bad directory path was given.""" def __init__(self, error_msg: str): + super().__init__(error_msg) self.error_msg = error_msg - def __str__(self) -> str: - return self.error_msg - class BadFilename(CacheManagerError): """A bad tarball path was given.""" def __init__(self, path: PathLike): + super().__init__(f"The file path {path} is not a tarball") self.path = str(path) - def __str__(self) -> str: - return f"The file path {self.path!r} is not a tarball" - class CacheExtractBadPath(CacheManagerError): """Request to extract a path that's bad or not a file""" def __init__(self, tar_name: Path, path: PathLike): + super().__init__(f"Unable to extract {path} from {tar_name.name}") self.name = tar_name.name self.path = str(path) - def __str__(self) -> str: - return f"Unable to extract {self.path} from {self.name}" - class TarballNotFound(CacheManagerError): """The dataset was not found in the ARCHIVE tree.""" def __init__(self, tarball: str): + super().__init__(f"The dataset tarball named {tarball!r} is not found") self.tarball = tarball - def __str__(self) -> str: - return f"The dataset tarball named {self.tarball!r} is not found" - class DuplicateTarball(CacheManagerError): """A duplicate tarball name was detected.""" def __init__(self, tarball: str): + super().__init__(f"A dataset tarball named {tarball!r} is already present") self.tarball = tarball - def __str__(self) -> str: - return f"A dataset tarball named {self.tarball!r} is already present" - class MetadataError(CacheManagerError): """A problem was found processing a tarball's metadata.log file.""" def __init__(self, tarball: Path, error: Exception): + super().__init__( + f"A problem occurred processing metadata.log from {tarball!s}: {str(error)!r}" + ) self.tarball = tarball self.error = str(error) - def __str__(self) -> str: - return f"A problem occurred processing metadata.log from {self.tarball!s}: {self.error!r}" - class TarballUnpackError(CacheManagerError): """An error occurred trying to unpack a tarball.""" def __init__(self, tarball: Path, error: str): + super().__init__(f"An error occurred while unpacking {tarball}: {error}") self.tarball = tarball self.error = error - def __str__(self) -> str: - return f"An error occurred while unpacking {self.tarball}: {self.error}" - class TarballModeChangeError(CacheManagerError): """An error occurred trying to fix unpacked tarball permissions.""" def __init__(self, tarball: Path, error: str): + super().__init__( + f"An error occurred while changing file permissions of {tarball}: {error}" + ) self.tarball = tarball self.error = error - def __str__(self) -> str: - return f"An error occurred while changing file permissions of {self.tarball}: {self.error}" - class CacheType(Enum): FILE = auto() @@ -194,6 +181,82 @@ def make_cache_object(dir_path: Path, path: Path) -> CacheObject: ) +class LockRef: + """Keep track of a cache lock passed off to a caller""" + + def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): + """Initialize a lock reference + + The lock file is opened in "w+" mode, which is "write update": unlike + "r+", this creates the file if it doesn't already exist, but still + allows lock conversion between LOCK_EX and LOCK_SH. + + Args: + lock: the path of a lock file + exclusive: lock for exclusive access + wait: wait for lock + """ + self.lock = lock.open("w+") + self.locked = False + self.exclusive = exclusive + self.unlock = True + self.wait = wait + + def acquire(self) -> "LockRef": + """Acquire the lock""" + + cmd = fcntl.LOCK_EX if self.exclusive else fcntl.LOCK_SH + if not self.wait: + cmd |= fcntl.LOCK_NB + fcntl.lockf(self.lock, cmd) + self.locked = True + return self + + def release(self): + """Release the lock and close the lock file""" + + exception = None + try: + fcntl.lockf(self.lock, fcntl.LOCK_UN) + self.lock.close() + self.locked = False + self.exclusive = False + except Exception as e: + exception = e + finally: + # Release our reference to the lock file so that the + # object can be reclaimed. + self.lock = None + if exception: + raise exception + + def upgrade(self): + """Upgrade a shared lock to exclusive""" + if not self.exclusive: + fcntl.lockf(self.lock, fcntl.LOCK_EX) + self.exclusive = True + + def downgrade(self): + """Downgrade an exclusive lock to shared""" + if self.exclusive: + fcntl.lockf(self.lock, fcntl.LOCK_SH) + self.exclusive = False + + def keep(self) -> "LockRef": + """Tell the context manager not to unlock on exit""" + self.unlock = False + return self + + def __enter__(self) -> "LockRef": + """Enter a lock context manager by acquiring the lock""" + return self.acquire() + + def __exit__(self, *exc): + """Exit a lock context manager by releasing the lock""" + if self.unlock: + self.release() + + class Inventory: """Encapsulate the file stream and cache lock management @@ -206,7 +269,7 @@ class Inventory: def __init__( self, stream: IO[bytes], - lock: Optional[IO[bytes]] = None, + lock: Optional[LockRef] = None, subproc: Optional[subprocess.Popen] = None, ): """Construct an instance to track extracted inventory @@ -216,7 +279,8 @@ def __init__( Args: stream: the data stream of a specific tarball member - lock: the file reference for the cache lock file + lock: a cache lock reference + subproc: a Popen object to clean up on close """ self.stream = stream self.lock = lock @@ -224,6 +288,8 @@ def __init__( def close(self): """Close the byte stream and clean up the Popen object""" + + exception = None if self.subproc: try: if self.subproc.poll() is None: @@ -247,13 +313,12 @@ def close(self): self.subproc = None if self.lock: try: - fcntl.lockf(self.lock, fcntl.LOCK_UN) - self.lock.close() - finally: - # Release our reference to the lock file so that the - # object can be reclaimed. - self.lock = None + self.lock.release() + except Exception as e: + exception = e self.stream.close() + if exception: + raise exception def getbuffer(self): """Return the underlying byte buffer (used by send_file)""" @@ -337,6 +402,9 @@ def __init__(self, path: Path, resource_id: str, controller: "Controller"): # Record where cached unpacked data would live self.cache: Path = controller.cache / self.resource_id + # We need the directory to lock, so make sure it's there + self.cache.mkdir(parents=True, exist_ok=True) + # Record hierarchy of a Tar ball self.cachemap: Optional[CacheMap] = None @@ -707,30 +775,18 @@ def stream(self, path: str) -> Inventory: An Inventory object encapsulating the file stream and the cache lock. The caller is reponsible for closing the Inventory! """ - self.cache_create() - # NOTE: the cache is unlocked here. That's "probably OK" as the reclaim - # is based on the last modified timestamp of the lock file and only - # runs periodically. - # - # TODO: Can we convert the lock from EXCLUSIVE to SHARED? If so, that - # would be ideal. + with LockRef(self.lock) as lock: + if not self.unpacked: + lock.upgrade() + self.cache_create() + lock.downgrade() - lock = None - try: - lock = self.lock.open("rb", buffering=0) - fcntl.lockf(lock, fcntl.LOCK_SH) artifact: Path = self.unpacked / path - self.controller.logger.info("path {}: stat {}", artifact, artifact.exists()) if not artifact.is_file(): raise CacheExtractBadPath(self.tarball_path, path) self.last_ref.touch(exist_ok=True) - return Inventory(artifact.open("rb"), lock=lock) - except Exception as e: - if lock: - fcntl.lockf(lock, fcntl.LOCK_UN) - lock.close() - raise e + return Inventory(artifact.open("rb"), lock=lock.keep()) def get_inventory(self, path: str) -> Optional[JSONOBJECT]: """Access the file stream of a tarball member file. @@ -819,53 +875,51 @@ def cache_create(self): Unpack the tarball into a temporary cache directory named with the tarball's resource_id (MD5). - Access to the cached directory is controlled by a "lockf" file to - ensure that two processes don't unpack at the same time and that we - can't reclaim a cache while it's being used. We also track a "last_ref" - modification timestamp which is used to reclaim old caches after - inactivity. + This method must be called while holding an exclusive cache lock. """ - if not self.cache.exists(): - self.cache.mkdir(exist_ok=True) - with self.lock.open("wb", buffering=0) as lock: - fcntl.lockf(lock, fcntl.LOCK_EX) + if not self.unpacked: audit = None error = None try: - if not self.unpacked: - audit = Audit.create( - name="cache", - operation=OperationCode.CREATE, - status=AuditStatus.BEGIN, - user_name=Audit.BACKGROUND_USER, - object_type=AuditType.DATASET, - object_id=self.resource_id, - object_name=self.name, - ) - tar_command = "tar -x --no-same-owner --delay-directory-restore " - tar_command += f"--force-local --file='{str(self.tarball_path)}'" - self.subprocess_run( - tar_command, self.cache, TarballUnpackError, self.tarball_path - ) - find_command = "find . ( -type d -exec chmod ugo+rx {} + ) -o ( -type f -exec chmod ugo+r {} + )" - self.subprocess_run( - find_command, self.cache, TarballModeChangeError, self.cache - ) - self.unpacked = self.cache / self.name + audit = Audit.create( + name="cache", + operation=OperationCode.CREATE, + status=AuditStatus.BEGIN, + user_name=Audit.BACKGROUND_USER, + object_type=AuditType.DATASET, + object_id=self.resource_id, + object_name=self.name, + ) + except Exception as e: + self.controller.logger.warning( + "Unable to audit unpack for {}: '{}'", self.name, e + ) + + try: + tar_command = "tar -x --no-same-owner --delay-directory-restore " + tar_command += f"--force-local --file='{str(self.tarball_path)}'" + self.subprocess_run( + tar_command, self.cache, TarballUnpackError, self.tarball_path + ) + find_command = "find . ( -type d -exec chmod ugo+rx {} + ) -o ( -type f -exec chmod ugo+r {} + )" + self.subprocess_run( + find_command, self.cache, TarballModeChangeError, self.cache + ) + self.unpacked = self.cache / self.name + self.last_ref.touch(exist_ok=True) + self.cache_map(self.unpacked) except Exception as e: error = str(e) raise finally: - self.last_ref.touch(exist_ok=True) - fcntl.lockf(lock, fcntl.LOCK_UN) if audit: + attributes = {"error": error} if error else {} Audit.create( root=audit, status=AuditStatus.FAILURE if error else AuditStatus.SUCCESS, - attributes={"error": error}, + attributes=attributes, ) - self.cache_map(self.unpacked) def cache_delete(self): """Remove the unpacked tarball directory and all contents. @@ -1310,19 +1364,6 @@ def create(self, tarfile_path: Path) -> Tarball: self.datasets[tarball.resource_id] = tarball return tarball - def unpack(self, dataset_id: str) -> Tarball: - """Unpack a tarball into the CACHE tree - - Args: - dataset_id: Dataset resource ID - - Returns: - The tarball object - """ - tarball = self.find_dataset(dataset_id) - tarball.cache_create() - return tarball - def get_info(self, dataset_id: str, path: Path) -> dict[str, Any]: """Get information about dataset files from the cache map @@ -1356,16 +1397,6 @@ def get_inventory(self, dataset_id: str, target: str) -> Optional[JSONOBJECT]: tarball = self.find_dataset(dataset_id) return tarball.get_inventory(target) - def cache_reclaim(self, dataset_id: str): - """Remove the unpacked tarball tree. - - Args: - dataset_id: Dataset resource ID to "uncache" - """ - tarball = self.find_dataset(dataset_id) - tarball.cache_delete() - self._clean_empties(tarball.controller.name) - def delete(self, dataset_id: str): """Delete the dataset as well as unpacked artifacts. diff --git a/lib/pbench/server/indexing_tarballs.py b/lib/pbench/server/indexing_tarballs.py index 8bd1f20972..d58f72f4c0 100644 --- a/lib/pbench/server/indexing_tarballs.py +++ b/lib/pbench/server/indexing_tarballs.py @@ -15,7 +15,7 @@ UnsupportedTarballFormat, ) from pbench.server import OperationCode, tstos -from pbench.server.cache_manager import CacheManager, Tarball, TarballNotFound +from pbench.server.cache_manager import CacheManager, LockRef, Tarball from pbench.server.database.models.audit import Audit, AuditStatus from pbench.server.database.models.datasets import ( Dataset, @@ -363,20 +363,23 @@ def sighup_handler(*args): ptb = None tarobj: Optional[Tarball] = None tb_res = error_code["OK"] + lock = None try: - # Dynamically unpack the tarball for indexing. + # We need the fully unpacked cache tree to index it try: - tarobj = self.cache_manager.unpack(dataset.resource_id) - if not tarobj.unpacked: - idxctx.logger.warning( - "{} has not been unpacked", dataset - ) - continue - except TarballNotFound as e: + tarobj = self.cache_manager.find_dataset( + dataset.resource_id + ) + lock = LockRef(tarobj.lock, exclusive=True).acquire() + tarobj.cache_create() + lock.downgrade() + except Exception as e: self.sync.error( dataset, f"Unable to unpack dataset: {e!s}", ) + if lock: + lock.release() continue audit = Audit.create( @@ -510,6 +513,8 @@ def sighup_handler(*args): Audit.create( root=audit, status=doneness, attributes=attributes ) + if lock: + lock.release() try: ie_len = ie_filepath.stat().st_size except FileNotFoundError: diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index 8b7438a910..8216da917a 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -1,6 +1,4 @@ -from contextlib import contextmanager import errno -import fcntl import hashlib import io from logging import Logger @@ -474,11 +472,15 @@ def generate_test_result_tree(tmp_path: Path, dir_name: str) -> Path: # create some directories and files inside the temp directory sub_dir = tmp_path / dir_name sub_dir.mkdir(parents=True, exist_ok=True) - (sub_dir / "f1.json").touch() - (sub_dir / "metadata.log").touch() + (sub_dir / "f1.json").write_text("{'json': 'value'}", encoding="utf-8") + (sub_dir / "metadata.log").write_text( + "[pbench]\nkey = value\n", encoding="utf-8" + ) for i in range(1, 4): (sub_dir / "subdir1" / f"subdir1{i}").mkdir(parents=True, exist_ok=True) - (sub_dir / "subdir1" / "f11.txt").touch() + (sub_dir / "subdir1" / "f11.txt").write_text( + "textual\nlines\n", encoding="utf-8" + ) (sub_dir / "subdir1" / "subdir14" / "subdir141").mkdir( parents=True, exist_ok=True ) @@ -524,16 +526,6 @@ def test_unpack_tar_subprocess_exception( tar = Path("/mock/A.tar.xz") cache = Path("/mock/.cache") - locks: list[tuple[str, str]] = [] - - @contextmanager - def open(s, m, buffering=-1): - yield None - - def locker(fd, mode): - nonlocal locks - locks.append((fd, mode)) - @staticmethod def mock_run(command, _dir_path, exception, dir_p): verb = "tar" @@ -541,11 +533,10 @@ def mock_run(command, _dir_path, exception, dir_p): raise exception(dir_p, subprocess.TimeoutExpired(verb, 43)) with monkeypatch.context() as m: + m.setattr(Path, "exists", lambda self: True) m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) - m.setattr(Path, "open", open) m.setattr(Path, "touch", lambda path, exist_ok=False: None) m.setattr(Tarball, "subprocess_run", mock_run) - m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) tb = Tarball( @@ -566,16 +557,6 @@ def test_unpack_find_subprocess_exception( cache = Path("/mock/.cache") rmtree_called = True - locks: list[tuple[str, str]] = [] - - @contextmanager - def open(s, m, buffering=-1): - yield None - - def locker(fd, mode): - nonlocal locks - locks.append((fd, mode)) - def mock_rmtree(path: Path, ignore_errors=False): nonlocal rmtree_called rmtree_called = True @@ -592,10 +573,9 @@ def mock_run(command, _dir_path, exception, dir_p): assert command.startswith("tar") with monkeypatch.context() as m: + m.setattr(Path, "exists", lambda self: True) m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) - m.setattr(Path, "open", open) m.setattr(Path, "touch", lambda path, exist_ok=False: None) - m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) m.setattr(Tarball, "subprocess_run", mock_run) m.setattr(shutil, "rmtree", mock_rmtree) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) @@ -618,16 +598,6 @@ def test_unpack_success(self, make_logger, db_session, monkeypatch): cache = Path("/mock/.cache") call = list() - locks: list[tuple[str, str]] = [] - - @contextmanager - def open(s, m, buffering=-1): - yield None - - def locker(fd, mode): - nonlocal locks - locks.append((fd, mode)) - def mock_run(args, **_kwargs): call.append(args[0]) @@ -646,7 +616,6 @@ def mock_resolve(_path, _strict=False): m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) m.setattr(Path, "open", open) m.setattr(Path, "touch", lambda path, exist_ok=False: None) - m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) m.setattr("pbench.server.cache_manager.subprocess.run", mock_run) m.setattr(Path, "resolve", mock_resolve) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) @@ -750,7 +719,7 @@ def test_cache_map_bad_dir_path( "f1.json", None, None, - 0, + 17, CacheType.FILE, ), ( @@ -768,7 +737,7 @@ def test_cache_map_bad_dir_path( "f11.txt", None, None, - 0, + 14, CacheType.FILE, ), ( @@ -777,7 +746,7 @@ def test_cache_map_bad_dir_path( "f11.txt", None, None, - 0, + 14, CacheType.FILE, ), ( @@ -786,7 +755,7 @@ def test_cache_map_bad_dir_path( "f11.txt", None, None, - 0, + 14, CacheType.FILE, ), ( @@ -945,7 +914,7 @@ def test_cache_map_traverse_cmap( "name": "f11.txt", "resolve_path": None, "resolve_type": None, - "size": 0, + "size": 14, "type": CacheType.FILE, }, ), @@ -1013,9 +982,9 @@ def test_cache_map_get_info_cmap( @pytest.mark.parametrize( "file_path,file_pattern,exp_stream", [ - ("", "dir_name.tar.xz", io.BytesIO(b"tarball_as_a_byte_stream")), - (None, "dir_name.tar.xz", io.BytesIO(b"tarball_as_a_byte_stream")), - ("f1.json", "f1.json", io.BytesIO(b"file_as_a_byte_stream")), + ("", "dir_name.tar.xz", b"tarball_as_a_byte_stream"), + (None, "dir_name.tar.xz", b"tarball_as_a_byte_stream"), + ("f1.json", "f1.json", b"{'json': 'value'}"), ("subdir1/f12_sym", None, CacheExtractBadPath(Path("a"), "b")), ], ) @@ -1023,36 +992,22 @@ def test_get_inventory( self, make_logger, monkeypatch, tmp_path, file_path, file_pattern, exp_stream ): """Test to extract file contents/stream from a file""" - archive = tmp_path / "mock/archive" - tar = archive / "ABC/dir_name.tar.xz" - cache = tmp_path / "mock/.cache" - - locks: list[tuple[str, str]] = [] - - def locker(fd, mode): - nonlocal locks - locks.append((fd, mode)) + archive = tmp_path / "mock" / "archive" / "ABC" + archive.mkdir(parents=True, exist_ok=True) + tar = archive / "dir_name.tar.xz" + tar.write_bytes(b"tarball_as_a_byte_stream") + cache = tmp_path / "mock" / ".cache" with monkeypatch.context() as m: m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) - real_open = Path.open - m.setattr( - Path, - "open", - lambda s, m="rb", buffering=-1: exp_stream - if file_pattern and file_pattern in str(s) - else real_open(s, m, buffering), - ) - m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) tb = Tarball( tar, "ABC", Controller(archive, cache, make_logger) ) - tb.unpacked = cache / "ABC/dir_name" - tar_dir = TestCacheManager.MockController.generate_test_result_tree( + tb.unpacked = cache / "ABC" / "dir_name" + TestCacheManager.MockController.generate_test_result_tree( cache / "ABC", "dir_name" ) - tb.cache_map(tar_dir) try: file_info = tb.get_inventory(file_path) except Exception as e: @@ -1061,17 +1016,8 @@ def locker(fd, mode): assert not isinstance(exp_stream, Exception) assert file_info["type"] is CacheType.FILE stream: Inventory = file_info["stream"] - assert stream.stream == exp_stream + assert stream.stream.read() == exp_stream stream.close() - if not file_path: - assert len(locks) == 0 - else: - assert list(i[1] for i in locks) == [ - fcntl.LOCK_EX, - fcntl.LOCK_UN, - fcntl.LOCK_SH, - fcntl.LOCK_UN, - ] def test_cm_inventory(self, monkeypatch, server_config, make_logger): """Verify the happy path of the high level get_inventory""" @@ -1562,7 +1508,7 @@ def test_find( assert exc.value.tarball == "foobar" # Unpack the dataset, creating INCOMING and RESULTS links - cm.unpack(md5) + tarball.cache_create() assert tarball.cache == controller.cache / md5 assert tarball.unpacked == controller.cache / md5 / tarball.name assert tarball.last_ref.exists() @@ -1646,14 +1592,16 @@ def test_lifecycle( assert list(cm.tarballs) == [dataset_name] assert list(cm.datasets) == [md5] + dataset = cm.find_dataset(md5) + # Now "unpack" the tarball and check that the incoming directory and # results link are set up. - cm.unpack(md5) - assert cache == cm[md5].cache + dataset.cache_create() + assert cache == dataset.cache assert cache.is_dir() assert unpack.is_dir() - assert cm.datasets[md5].unpacked == unpack + assert dataset.unpacked == unpack # Re-discover, with all the files in place, and compare newcm = CacheManager(server_config, make_logger) @@ -1682,7 +1630,7 @@ def test_lifecycle( assert tarball.unpacked == other.unpacked # Remove the unpacked tarball, and confirm the directory is removed - cm.cache_reclaim(md5) + dataset.cache_delete() assert not unpack.exists() # Now that we have all that setup, delete the dataset diff --git a/lib/pbench/test/unit/server/test_indexing_tarballs.py b/lib/pbench/test/unit/server/test_indexing_tarballs.py index 5ee642591e..9a6f14552f 100644 --- a/lib/pbench/test/unit/server/test_indexing_tarballs.py +++ b/lib/pbench/test/unit/server/test_indexing_tarballs.py @@ -265,6 +265,53 @@ def error(self, dataset: Dataset, message: str): __class__.errors[dataset.name] = message +class FakeLockRef: + def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): + """Initialize a lock reference + + The lock file is opened in "w+" mode, which is "write update": unlike + "r+", this creates the file if it doesn't already exist, but still + allows lock conversion between LOCK_EX and LOCK_SH. + + Args: + lock: the path of a lock file + exclusive: lock for exclusive access + wait: [default] wait for lock + """ + self.locked = False + self.exclusive = exclusive + self.unlock = True + + def acquire(self) -> "FakeLockRef": + self.locked = True + return self + + def release(self): + """Release the lock and close the lock file""" + self.locked = False + self.exclusive = False + + def upgrade(self): + if not self.exclusive: + self.exclusive = True + + def downgrade(self): + if self.exclusive: + self.exclusive = False + + def keep(self) -> "FakeLockRef": + """Tell the context manager not to unlock on exit""" + self.unlock = False + return self + + def __enter__(self) -> "FakeLockRef": + return self.acquire() + + def __exit__(self, *exc): + if self.unlock: + self.release() + + class FakeController: def __init__(self, path: Path, cache: Path, logger: Logger): self.name = path.name @@ -279,9 +326,13 @@ def __init__(self, path: Path, resource_id: str, controller: FakeController): self.tarball_path = path self.controller = controller self.cache = controller.cache / "ABC" + self.lock = self.cache / "lock" + self.last_ref = self.cache / "last_ref" self.unpacked = self.cache / self.name self.isolator = controller.path / resource_id - self.uncache = lambda: None + + def cache_create(self): + pass class FakeCacheManager: @@ -292,7 +343,7 @@ def __init__(self, config: PbenchServerConfig, logger: Logger): self.logger = logger self.datasets = {} - def unpack(self, resource_id: str): + def find_dataset(self, resource_id: str): controller = FakeController(Path("/archive/ABC"), Path("/.cache"), self.logger) return FakeTarball( Path(f"/archive/ABC/{resource_id}/{self.lookup[resource_id]}.tar.xz"), @@ -341,6 +392,7 @@ def mocks(monkeypatch, make_logger): m.setattr("pbench.server.indexing_tarballs.Metadata", FakeMetadata) m.setattr("pbench.server.indexing_tarballs.IndexMap", FakeIndexMap) m.setattr("pbench.server.indexing_tarballs.CacheManager", FakeCacheManager) + m.setattr("pbench.server.indexing_tarballs.LockRef", FakeLockRef) m.setattr("pbench.server.indexing_tarballs.Audit", FakeAudit) yield m FakeAudit.reset()