diff --git a/lib/pbench/cli/server/tree_manage.py b/lib/pbench/cli/server/tree_manage.py index eef74f66ee..e363b8dbde 100644 --- a/lib/pbench/cli/server/tree_manage.py +++ b/lib/pbench/cli/server/tree_manage.py @@ -9,7 +9,7 @@ from pbench.cli.server.options import common_options from pbench.common.logger import get_pbench_logger from pbench.server import BadConfig, OperationCode -from pbench.server.cache_manager import CacheManager, LockRef +from pbench.server.cache_manager import CacheManager, LockManager from pbench.server.database.models.audit import Audit, AuditStatus, AuditType # Length of time in hours to retain unreferenced cached results data. @@ -48,7 +48,7 @@ def reclaim_cache(tree: CacheManager, logger: Logger, lifetime: float = CACHE_LI window, ) try: - with LockRef(tarball.lock, exclusive=True, wait=False): + with LockManager(tarball.lock, exclusive=True, wait=False): try: audit = Audit.create( name="reclaim", diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index dc2cfdc2ef..34d387dc13 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -69,7 +69,7 @@ class MetadataError(CacheManagerError): def __init__(self, tarball: Path, error: Exception): super().__init__( - f"A problem occurred processing metadata.log from {tarball!s}: {str(error)!r}" + f"A problem occurred processing metadata.log from {tarball}: {str(error)!r}" ) self.tarball = tarball self.error = str(error) @@ -184,7 +184,7 @@ def make_cache_object(dir_path: Path, path: Path) -> CacheObject: class LockRef: """Keep track of a cache lock passed off to a caller""" - def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): + def __init__(self, lock: Path): """Initialize a lock reference The lock file is opened in "w+" mode, which is "write update": unlike @@ -193,20 +193,25 @@ def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): Args: lock: the path of a lock file - exclusive: lock for exclusive access - wait: wait for lock """ self.lock = lock.open("w+") self.locked = False - self.exclusive = exclusive - self.unlock = True - self.wait = wait + self.exclusive = False + + def acquire(self, exclusive: bool = False, wait: bool = True) -> "LockRef": + """Acquire the lock + + Args: + exclusive: lock for exclusive access + wait: wait for lock - def acquire(self) -> "LockRef": - """Acquire the lock""" + Returns: + The lockref, so this can be chained with the constructor + """ - cmd = fcntl.LOCK_EX if self.exclusive else fcntl.LOCK_SH - if not self.wait: + self.exclusive = exclusive + cmd = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH + if not wait: cmd |= fcntl.LOCK_NB fcntl.lockf(self.lock, cmd) self.locked = True @@ -215,20 +220,17 @@ def acquire(self) -> "LockRef": def release(self): """Release the lock and close the lock file""" - exception = None + if not self.locked: + return try: fcntl.lockf(self.lock, fcntl.LOCK_UN) self.lock.close() - self.locked = False - self.exclusive = False - except Exception as e: - exception = e finally: # Release our reference to the lock file so that the # object can be reclaimed. + self.locked = False + self.exclusive = False self.lock = None - if exception: - raise exception def upgrade(self): """Upgrade a shared lock to exclusive""" @@ -242,19 +244,44 @@ def downgrade(self): fcntl.lockf(self.lock, fcntl.LOCK_SH) self.exclusive = False - def keep(self) -> "LockRef": + +class LockManager: + def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): + self.lock = LockRef(lock) + self.exclusive = exclusive + self.wait = wait + self.unlock = True + + def keep(self) -> "LockManager": """Tell the context manager not to unlock on exit""" self.unlock = False return self - def __enter__(self) -> "LockRef": + def release(self): + """Release manually if necessary""" + self.lock.release() + + def upgrade(self): + """Upgrade a shared lock to exclusive""" + self.lock.upgrade() + + def downgrade(self): + """Downgrade an exclusive lock to shared""" + self.lock.downgrade() + + def __enter__(self) -> "LockManager": """Enter a lock context manager by acquiring the lock""" - return self.acquire() + self.lock.acquire(exclusive=self.exclusive, wait=self.wait) + return self def __exit__(self, *exc): - """Exit a lock context manager by releasing the lock""" + """Exit a lock context manager by releasing the lock + + This does nothing if "unlock" has been cleared, meaning we want the + lock to persist beyond the context manager scope. + """ if self.unlock: - self.release() + self.lock.release() class Inventory: @@ -771,12 +798,22 @@ def stream(self, path: str) -> Inventory: Args: path: Relative path of a regular file within a tarball. + On failure, an exception is raised and the cache lock is released; on + success, returns an Inventory object which implicitly transfers + ownership and management of the cache lock to the caller. When done + with the inventory's file stream, the caller must close the Inventory + object to release the file stream and the cache lock. + + Raises: + CacheExtractBadPath: the path does not match a regular file within + the tarball. + Returns: An Inventory object encapsulating the file stream and the cache - lock. The caller is reponsible for closing the Inventory! + lock. """ - with LockRef(self.lock) as lock: + with LockManager(self.lock) as lock: if not self.unpacked: lock.upgrade() self.cache_create() diff --git a/lib/pbench/server/indexing_tarballs.py b/lib/pbench/server/indexing_tarballs.py index d58f72f4c0..436c0dee00 100644 --- a/lib/pbench/server/indexing_tarballs.py +++ b/lib/pbench/server/indexing_tarballs.py @@ -370,7 +370,7 @@ def sighup_handler(*args): tarobj = self.cache_manager.find_dataset( dataset.resource_id ) - lock = LockRef(tarobj.lock, exclusive=True).acquire() + lock = LockRef(tarobj.lock).acquire(exclusive=True) tarobj.cache_create() lock.downgrade() except Exception as e: diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index a2cdbd596c..055e3b4602 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -22,6 +22,7 @@ Controller, DuplicateTarball, Inventory, + LockManager, LockRef, MetadataError, Tarball, @@ -547,8 +548,8 @@ def mock_run(command, _dir_path, exception, dir_p): with pytest.raises(TarballUnpackError) as exc: tb.cache_create() msg = f"An error occurred while unpacking {tar}: Command 'tar' timed out after 43 seconds" - assert str(exc.value) == msg - assert exc.type == TarballUnpackError + with pytest.raises(TarballUnpackError, match=msg): + tb.cache_create() def test_unpack_find_subprocess_exception( self, make_logger, db_session, monkeypatch @@ -586,12 +587,11 @@ def mock_run(command, _dir_path, exception, dir_p): tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) ) - with pytest.raises(TarballModeChangeError) as exc: - tb.cache_create() msg = "An error occurred while changing file permissions of " msg += f"{cache / 'ABC'}: Command 'find' timed out after 43 seconds" + with pytest.raises(TarballModeChangeError, match=msg) as exc: + tb.cache_create() assert str(exc.value) == msg - assert exc.type == TarballModeChangeError assert rmtree_called def test_unpack_success(self, make_logger, db_session, monkeypatch): @@ -616,7 +616,6 @@ def mock_resolve(_path, _strict=False): with monkeypatch.context() as m: m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) - m.setattr(Path, "open", open) m.setattr(Path, "touch", lambda path, exist_ok=False: None) m.setattr("pbench.server.cache_manager.subprocess.run", mock_run) m.setattr(Path, "resolve", mock_resolve) @@ -1264,6 +1263,7 @@ def seek(self, offset: int, _whence: int = io.SEEK_SET) -> int: stream = Inventory(MockBufferedReader()) assert stream.lock is None + assert stream.subproc is None # Test Inventory.getbuffer() calls.clear() @@ -1579,7 +1579,7 @@ def test_lifecycle( cm = CacheManager(server_config, make_logger) archive = cm.archive_root / "ABC" cache = cm.cache_root / md5 - dataset_name = source_tarball.name[:-7] + dataset_name = Dataset.stem(source_tarball) unpack = cache / dataset_name monkeypatch.setattr(Tarball, "_get_metadata", fake_get_metadata) @@ -1740,6 +1740,9 @@ def fileno(self) -> int: def __eq__(self, other) -> bool: return self.file == other.file + def __str__(self) -> str: + return f"Fake Stream with file {self.file!r}" + def fake_lockf(fd: FakeStream, cmd: int): locks.append((fd.file, cmd)) if lockf_fail: @@ -1759,8 +1762,6 @@ def fake_open(self, flags: str) -> FakeStream: assert lock.lock == FakeStream(lockfile) assert not lock.locked assert not lock.exclusive - assert lock.unlock - assert lock.wait assert len(locks) == 0 assert files == [("open", lockfile, "w+")] lock.acquire() @@ -1773,10 +1774,10 @@ def fake_open(self, flags: str) -> FakeStream: # No-wait reset() - lock = LockRef(lockfile, wait=False) + lock = LockRef(lockfile) assert files == [("open", lockfile, "w+")] - assert not lock.wait - lock.acquire() + lock.acquire(wait=False) + assert not lock.exclusive assert locks == [(lockfile, fcntl.LOCK_SH | fcntl.LOCK_NB)] lock.release() assert locks == [ @@ -1787,54 +1788,47 @@ def fake_open(self, flags: str) -> FakeStream: # Exclusive reset() - lock = LockRef(lockfile, exclusive=True) + lock = LockRef(lockfile) assert files == [("open", lockfile, "w+")] + lock.acquire(exclusive=True) assert lock.exclusive - lock.acquire() assert locks == [(lockfile, fcntl.LOCK_EX)] - lock.release() - assert locks == [(lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_UN)] - assert files == [("open", lockfile, "w+"), ("close", lockfile)] - - # Context Manager - reset() - with LockRef(lockfile) as lock: - assert lock.locked - assert files == [("open", lockfile, "w+")] - assert locks == [(lockfile, fcntl.LOCK_SH)] - lock.upgrade() - assert locks == [(lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_EX)] - # upgrade is idempotent - lock.upgrade() - assert locks == [(lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_EX)] - lock.downgrade() - assert locks == [ - (lockfile, fcntl.LOCK_SH), - (lockfile, fcntl.LOCK_EX), - (lockfile, fcntl.LOCK_SH), - ] - # downgrade is idempotent - lock.downgrade() - assert locks == [ - (lockfile, fcntl.LOCK_SH), - (lockfile, fcntl.LOCK_EX), - (lockfile, fcntl.LOCK_SH), - ] + assert lock.locked + assert lock.exclusive + # upgrade is idempotent + lock.upgrade() + assert locks == [(lockfile, fcntl.LOCK_EX)] + assert lock.exclusive + assert lock.locked + lock.downgrade() + assert locks == [(lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_SH)] + assert not lock.exclusive + assert lock.locked + # downgrade is idempotent + lock.downgrade() + assert locks == [(lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_SH)] + assert not lock.exclusive + assert lock.locked + # upgrade back to exclusive + lock.upgrade() assert locks == [ + (lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_EX), + ] + assert lock.exclusive + assert lock.locked + # release + lock.release() + assert locks == [ + (lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_SH), + (lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_UN), ] assert files == [("open", lockfile, "w+"), ("close", lockfile)] - - # Context Manager with 'keep' option\ - reset() - with LockRef(lockfile) as lock: - assert lock.keep() is lock - assert not lock.unlock - assert locks == [(lockfile, fcntl.LOCK_SH)] - assert files == [("open", lockfile, "w+")] + assert not lock.locked + assert not lock.exclusive # Check release exception handling on close reset() @@ -1863,3 +1857,81 @@ def fake_open(self, flags: str) -> FakeStream: assert lock.lock is None assert files == [("open", lockfile, "w+")] assert locks == [(lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_UN)] + + def test_lockmanager(self, monkeypatch): + """Test LockManager""" + + operations = [] + + class FakeLockRef: + def __init__(self, lockpath: Path): + self.lock = lockpath + + def acquire(self, **kwargs): + operations.append( + ( + "acquire", + kwargs.get("exclusive", False), + kwargs.get("wait", True), + ) + ) + + def release(self): + operations.append(("release")) + + def upgrade(self): + operations.append(("upgrade")) + + def downgrade(self): + operations.append(("downgrade")) + + def reset(): + operations.clear() + + lockfile = Path("/lock") + monkeypatch.setattr("pbench.server.cache_manager.LockRef", FakeLockRef) + # default parameters + with LockManager(lockfile) as lock: + assert lock.unlock + assert not lock.exclusive + assert lock.wait + assert operations == [("acquire", False, True)] + lock.upgrade() + assert operations == [("acquire", False, True), ("upgrade")] + lock.downgrade() + assert operations == [("acquire", False, True), ("upgrade"), ("downgrade")] + assert operations == [ + ("acquire", False, True), + ("upgrade"), + ("downgrade"), + ("release"), + ] + + # exclusive + reset() + with LockManager(lockfile, exclusive=True) as lock: + assert lock.unlock + assert lock.exclusive + assert lock.wait + assert operations == [("acquire", True, True)] + assert operations == [("acquire", True, True), ("release")] + + # no-wait + reset() + with LockManager(lockfile, wait=False) as lock: + assert lock.unlock + assert not lock.exclusive + assert not lock.wait + assert operations == [("acquire", False, False)] + assert operations == [("acquire", False, False), ("release")] + + # keep' option + reset() + with LockManager(lockfile) as lock: + assert lock.keep() is lock + still_locked = lock + assert not lock.unlock + assert operations == [("acquire", False, True)] + assert operations == [("acquire", False, True)] + still_locked.release() + assert operations == [("acquire", False, True), ("release")] diff --git a/lib/pbench/test/unit/server/test_indexing_tarballs.py b/lib/pbench/test/unit/server/test_indexing_tarballs.py index 9a6f14552f..789987d47e 100644 --- a/lib/pbench/test/unit/server/test_indexing_tarballs.py +++ b/lib/pbench/test/unit/server/test_indexing_tarballs.py @@ -266,7 +266,7 @@ def error(self, dataset: Dataset, message: str): class FakeLockRef: - def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): + def __init__(self, lock: Path): """Initialize a lock reference The lock file is opened in "w+" mode, which is "write update": unlike @@ -279,11 +279,12 @@ def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): wait: [default] wait for lock """ self.locked = False - self.exclusive = exclusive + self.exclusive = False self.unlock = True - def acquire(self) -> "FakeLockRef": + def acquire(self, exclusive: bool = False, wait: bool = True) -> "FakeLockRef": self.locked = True + self.exclusive = exclusive return self def release(self): @@ -299,18 +300,6 @@ def downgrade(self): if self.exclusive: self.exclusive = False - def keep(self) -> "FakeLockRef": - """Tell the context manager not to unlock on exit""" - self.unlock = False - return self - - def __enter__(self) -> "FakeLockRef": - return self.acquire() - - def __exit__(self, *exc): - if self.unlock: - self.release() - class FakeController: def __init__(self, path: Path, cache: Path, logger: Logger):