diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index 34d387dc13..be6676034b 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -334,16 +334,22 @@ def close(self): while self.subproc.stderr.read(4096): pass self.subproc.wait(60.0) - finally: + except Exception as e: # Release our reference to the subprocess.Popen object so that the # object can be reclaimed. self.subproc = None + exception = e if self.lock: try: self.lock.release() except Exception as e: exception = e self.stream.close() + + # NOTE: if both subprocess cleanup and unlock fail with exceptions, we + # raise the latter, and the former will be ignored. In practice, that's + # not a problem as we only construct an Inventory with a subprocess + # reference for extract, which doesn't need to lock a cache directory. if exception: raise exception @@ -585,8 +591,10 @@ def create( return cls(destination, resource_id, controller) def cache_map(self, dir_path: Path): - """Builds Hierarchy structure of a Directory in a Dictionary - Format. + """Build hierarchical representation of results tree + + NOTE: this structure isn't removed when we release the cache, as the + data remains valid. Args: dir_path: root directory @@ -614,8 +622,7 @@ def cache_map(self, dir_path: Path): @staticmethod def traverse_cmap(path: Path, cachemap: CacheMap) -> CacheMapEntry: - """Sequentially traverses the cachemap to find the leaf of a - relative path reference + """Locate a path in the cache map Args: path: relative path of the subdirectory/file @@ -648,11 +655,10 @@ def traverse_cmap(path: Path, cachemap: CacheMap) -> CacheMapEntry: def get_info(self, path: Path) -> JSONOBJECT: """Returns the details of the given file/directory in dict format - NOTE: This requires a call to the cache_map method to build a map that - can be traversed. Currently, this is done only on unpack, and isn't - useful except within the `pbench-index` process. This map needs to - either be built dynamically (potentially expensive) or persisted in - SQL or perhaps Redis. + NOTE: If the cache manager doesn't already have a cache map for the + current Tarball, we'll unpack it here; however as the cache map isn't + dependent on the unpacked results tree, we immediately release the + cache lock. Args: path: path of the file/subdirectory @@ -681,6 +687,10 @@ def get_info(self, path: Path) -> JSONOBJECT: " we expect relative path to the root directory." ) + if not self.cachemap: + with LockManager(self.lock) as lock: + self.get_results(lock) + c_map = self.traverse_cmap(path, self.cachemap) children = c_map["children"] if "children" in c_map else {} fd_info = c_map["details"].__dict__.copy() @@ -704,6 +714,11 @@ def get_info(self, path: Path) -> JSONOBJECT: def extract(tarball_path: Path, path: str) -> Inventory: """Returns a file stream for a file within a tarball + NOTE: This should generally be used only within the INTAKE path before + it's practical to load the cache with a fully unpacked tree. The + extracted file is not cached and therefore each reference will repeat + the tar file extraction. + Args: tarball_path: absolute path of the tarball path: relative path within the tarball @@ -814,15 +829,9 @@ def stream(self, path: str) -> Inventory: """ with LockManager(self.lock) as lock: - if not self.unpacked: - lock.upgrade() - self.cache_create() - lock.downgrade() - - artifact: Path = self.unpacked / path + artifact: Path = self.get_results(lock) / path if not artifact.is_file(): raise CacheExtractBadPath(self.tarball_path, path) - self.last_ref.touch(exist_ok=True) return Inventory(artifact.open("rb"), lock=lock.keep()) def get_inventory(self, path: str) -> Optional[JSONOBJECT]: @@ -906,16 +915,22 @@ def subprocess_run( f"{cmd[0]} exited with status {process.returncode}: {process.stderr.strip()!r}", ) - def cache_create(self): + def get_results(self, lock: Union[LockManager, LockRef]) -> Path: """Unpack a tarball into a temporary directory tree - Unpack the tarball into a temporary cache directory named with the - tarball's resource_id (MD5). + Make sure that the dataset results are unpacked into a cache tree. The + location of the unpacked tree is in self.unpacked and is also returned + direct to the caller. + + Args: + lock: A lock reference (or context manager) in shared lock state - This method must be called while holding an exclusive cache lock. + Returns: + the root Path of the unpacked directory tree """ if not self.unpacked: + lock.upgrade() audit = None error = None try: @@ -944,7 +959,6 @@ def cache_create(self): find_command, self.cache, TarballModeChangeError, self.cache ) self.unpacked = self.cache / self.name - self.last_ref.touch(exist_ok=True) self.cache_map(self.unpacked) except Exception as e: error = str(e) @@ -957,6 +971,9 @@ def cache_create(self): status=AuditStatus.FAILURE if error else AuditStatus.SUCCESS, attributes=attributes, ) + lock.downgrade() + self.last_ref.touch(exist_ok=True) + return self.unpacked def cache_delete(self): """Remove the unpacked tarball directory and all contents. diff --git a/lib/pbench/server/indexing_tarballs.py b/lib/pbench/server/indexing_tarballs.py index 436c0dee00..a520106765 100644 --- a/lib/pbench/server/indexing_tarballs.py +++ b/lib/pbench/server/indexing_tarballs.py @@ -370,9 +370,8 @@ def sighup_handler(*args): tarobj = self.cache_manager.find_dataset( dataset.resource_id ) - lock = LockRef(tarobj.lock).acquire(exclusive=True) - tarobj.cache_create() - lock.downgrade() + lock = LockRef(tarobj.lock).acquire() + tarobj.get_results(lock) except Exception as e: self.sync.error( dataset, diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index 6c04431011..c4948aefce 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -86,6 +86,35 @@ def fake_get_metadata(_tb_path): ) +class FakeLockRef: + operations = [] + + def __init__(self, lockpath: Path): + self.lock = lockpath + + def acquire(self, **kwargs): + self.operations.append( + ( + "acquire", + kwargs.get("exclusive", False), + kwargs.get("wait", True), + ) + ) + + def release(self): + self.operations.append(("release")) + + def upgrade(self): + self.operations.append(("upgrade")) + + def downgrade(self): + self.operations.append(("downgrade")) + + @classmethod + def reset(cls): + cls.operations.clear() + + class TestCacheManager: def test_create(self, server_config, make_logger): """ @@ -547,7 +576,8 @@ def mock_run(command, _dir_path, exception, dir_p): ) msg = f"An error occurred while unpacking {tar}: Command 'tar' timed out after 43 seconds" with pytest.raises(TarballUnpackError, match=msg): - tb.cache_create() + tb.get_results(FakeLockRef(tb.lock)) + FakeLockRef.reset() def test_unpack_find_subprocess_exception( self, make_logger, db_session, monkeypatch @@ -588,9 +618,10 @@ def mock_run(command, _dir_path, exception, dir_p): msg = "An error occurred while changing file permissions of " msg += f"{cache / 'ABC'}: Command 'find' timed out after 43 seconds" with pytest.raises(TarballModeChangeError, match=msg) as exc: - tb.cache_create() + tb.get_results(FakeLockRef(tb.lock)) assert str(exc.value) == msg assert rmtree_called + FakeLockRef.reset() def test_unpack_success(self, make_logger, db_session, monkeypatch): """Test to check the unpacking functionality of the CacheManager""" @@ -622,9 +653,10 @@ def mock_resolve(_path, _strict=False): tb = Tarball( tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) ) - tb.cache_create() + tb.get_results(FakeLockRef(tb.lock)) assert call == ["tar", "find"] assert tb.unpacked == cache / "ABC" / tb.name + FakeLockRef.reset() def test_cache_map_success(self, make_logger, monkeypatch, tmp_path): """Test to build the cache map of the root directory""" @@ -996,7 +1028,7 @@ def test_get_inventory( the tarball, based on "is_unpacked", to be sure both cases are covered; but be aware that accessing the tarball itself (the "" and None file_path cases) don't ever unpack and should always have is_unpacked - set to True. (That is, we don't expect cache_create to be called in + set to True. (That is, we don't expect get_results to be called in those cases.) """ archive = tmp_path / "mock" / "archive" / "ABC" @@ -1006,18 +1038,19 @@ def test_get_inventory( cache = tmp_path / "mock" / ".cache" unpacked = cache / "ABC" / "dir_name" - create_called = False + get_results_called = False - def fake_create(self): - nonlocal create_called - create_called = True + def fake_results(self, lock: LockRef) -> Path: + nonlocal get_results_called + get_results_called = True self.unpacked = unpacked + return self.unpacked with monkeypatch.context() as m: m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) tb = Tarball(tar, "ABC", Controller(archive, cache, make_logger)) - m.setattr(Tarball, "cache_create", fake_create) + m.setattr(Tarball, "get_results", fake_results) if is_unpacked: tb.unpacked = unpacked TestCacheManager.MockController.generate_test_result_tree( @@ -1033,7 +1066,6 @@ def fake_create(self): stream: Inventory = file_info["stream"] assert stream.stream.read() == exp_stream stream.close() - assert create_called is not is_unpacked def test_cm_inventory(self, monkeypatch, server_config, make_logger): """Verify the happy path of the high level get_inventory""" @@ -1369,7 +1401,7 @@ def seek(self, offset: int, _whence: int = io.SEEK_SET) -> int: ), # The subprocess is still running when close() is called, stdout is # empty, there is no stderr, and the wait times out. - (None, 0, None, True, ["poll", "kill", "stdout", "wait"]), + (None, 0, None, True, ["poll", "kill", "stdout", "wait", "close"]), ), ) def test_inventory( @@ -1525,10 +1557,11 @@ def test_find( assert exc.value.tarball == "foobar" # Unpack the dataset, creating INCOMING and RESULTS links - tarball.cache_create() + tarball.get_results(FakeLockRef(tarball.lock)) assert tarball.cache == controller.cache / md5 assert tarball.unpacked == controller.cache / md5 / tarball.name assert tarball.last_ref.exists() + FakeLockRef.reset() audits = Audit.query(name="cache") assert len(audits) == 2 @@ -1613,10 +1646,11 @@ def test_lifecycle( # Now "unpack" the tarball and check that the incoming directory and # results link are set up. - dataset.cache_create() + dataset.get_results(FakeLockRef(dataset.lock)) assert cache == dataset.cache assert cache.is_dir() assert unpack.is_dir() + FakeLockRef.reset() assert dataset.unpacked == unpack @@ -1685,8 +1719,11 @@ def test_compatibility( t2 = cm1[id] assert t1.name == t2.name == Dataset.stem(source_tarball) - t1.cache_create() - t2.cache_create() + r1 = t1.get_results(FakeLockRef(t1.lock)) + assert r1 == t1.unpacked + r2 = t2.get_results(FakeLockRef(t2.lock)) + assert r2 == t2.unpacked + FakeLockRef.reset() assert t1.unpacked != t2.unpacked assert (t1.unpacked / "metadata.log").is_file() @@ -1857,33 +1894,6 @@ def fake_open(self, flags: str) -> FakeStream: def test_lockmanager(self, monkeypatch): """Test LockManager""" - operations = [] - - class FakeLockRef: - def __init__(self, lockpath: Path): - self.lock = lockpath - - def acquire(self, **kwargs): - operations.append( - ( - "acquire", - kwargs.get("exclusive", False), - kwargs.get("wait", True), - ) - ) - - def release(self): - operations.append(("release")) - - def upgrade(self): - operations.append(("upgrade")) - - def downgrade(self): - operations.append(("downgrade")) - - def reset(): - operations.clear() - lockfile = Path("/lock") monkeypatch.setattr("pbench.server.cache_manager.LockRef", FakeLockRef) # default parameters @@ -1891,12 +1901,16 @@ def reset(): assert lock.unlock assert not lock.exclusive assert lock.wait - assert operations == [("acquire", False, True)] + assert FakeLockRef.operations == [("acquire", False, True)] lock.upgrade() - assert operations == [("acquire", False, True), ("upgrade")] + assert FakeLockRef.operations == [("acquire", False, True), ("upgrade")] lock.downgrade() - assert operations == [("acquire", False, True), ("upgrade"), ("downgrade")] - assert operations == [ + assert FakeLockRef.operations == [ + ("acquire", False, True), + ("upgrade"), + ("downgrade"), + ] + assert FakeLockRef.operations == [ ("acquire", False, True), ("upgrade"), ("downgrade"), @@ -1904,30 +1918,30 @@ def reset(): ] # exclusive - reset() + FakeLockRef.reset() with LockManager(lockfile, exclusive=True) as lock: assert lock.unlock assert lock.exclusive assert lock.wait - assert operations == [("acquire", True, True)] - assert operations == [("acquire", True, True), ("release")] + assert FakeLockRef.operations == [("acquire", True, True)] + assert FakeLockRef.operations == [("acquire", True, True), ("release")] # no-wait - reset() + FakeLockRef.reset() with LockManager(lockfile, wait=False) as lock: assert lock.unlock assert not lock.exclusive assert not lock.wait - assert operations == [("acquire", False, False)] - assert operations == [("acquire", False, False), ("release")] + assert FakeLockRef.operations == [("acquire", False, False)] + assert FakeLockRef.operations == [("acquire", False, False), ("release")] # keep' option - reset() + FakeLockRef.reset() with LockManager(lockfile) as lock: assert lock.keep() is lock still_locked = lock assert not lock.unlock - assert operations == [("acquire", False, True)] - assert operations == [("acquire", False, True)] + assert FakeLockRef.operations == [("acquire", False, True)] + assert FakeLockRef.operations == [("acquire", False, True)] still_locked.release() - assert operations == [("acquire", False, True), ("release")] + assert FakeLockRef.operations == [("acquire", False, True), ("release")] diff --git a/lib/pbench/test/unit/server/test_indexing_tarballs.py b/lib/pbench/test/unit/server/test_indexing_tarballs.py index 23e91dab6c..ee96f11b86 100644 --- a/lib/pbench/test/unit/server/test_indexing_tarballs.py +++ b/lib/pbench/test/unit/server/test_indexing_tarballs.py @@ -5,7 +5,7 @@ from pathlib import Path from signal import SIGHUP import time -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union import pytest @@ -16,6 +16,7 @@ OperationCode, PbenchServerConfig, ) +from pbench.server.cache_manager import LockManager, LockRef from pbench.server.database.models.audit import AuditStatus from pbench.server.database.models.datasets import ( Dataset, @@ -325,7 +326,7 @@ def __init__(self, path: Path, resource_id: str, controller: FakeController): self.unpacked = self.cache / self.name self.isolator = controller.path / resource_id - def cache_create(self): + def get_results(self, lock: Union[LockRef, LockManager]): pass