From 8b465e3defac335bcffcd0f588d4c44c9b653b8e Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Thu, 7 Sep 2023 17:29:40 -0400 Subject: [PATCH 01/14] Quick cache manager PBENCH-1249 On large datasets, our direct tarball extraction method can time out the API call. Unlike on a long intake, there is no persistent artifact so a retry will always time out as well. This applies to any `get_inventory` call, and therefore to the `/inventory`, `/visualize`, and `/compare` APIs; and given the central importance of those APIs for our Server 1.0 story, that's not an acceptable failure mode. This PR mitigates that problem with a "compromise" partial cache manager, leveraging the existing `unpack` method but adding a file lock to manage shared access. The idea is that any consumer of tarball contents (including the indexer) will unpack the entire tarball, but leave a "last reference" timestamp. A periodic timer service will check the cache unpack timestamps, and delete the unpack directories which aren't currently locked and which haven't been referenced for longer than a set time period. __NOTE__: I'm posting a draft mostly for coverage data after a lot of drift in the cache manager unit tests, to determine whether more work is necessary. The "last reference" and reclaim mechanism isn't yet implemented, though that should be the "easy part" now that I've got the server code working. --- .../api/resources/datasets_inventory.py | 9 +- lib/pbench/server/cache_manager.py | 143 ++++++++++-------- .../test/unit/server/test_cache_manager.py | 128 ++++++++++++---- .../unit/server/test_datasets_inventory.py | 6 +- .../unit/server/test_datasets_visualize.py | 2 +- 5 files changed, 187 insertions(+), 101 deletions(-) diff --git a/lib/pbench/server/api/resources/datasets_inventory.py b/lib/pbench/server/api/resources/datasets_inventory.py index a006cae726..260ad6375a 100644 --- a/lib/pbench/server/api/resources/datasets_inventory.py +++ b/lib/pbench/server/api/resources/datasets_inventory.py @@ -19,7 +19,12 @@ ParamType, Schema, ) -from pbench.server.cache_manager import CacheManager, CacheType, TarballNotFound +from pbench.server.cache_manager import ( + CacheExtractBadPath, + CacheManager, + CacheType, + TarballNotFound, +) class DatasetsInventory(ApiBase): @@ -63,7 +68,7 @@ def _get( cache_m = CacheManager(self.config, current_app.logger) try: file_info = cache_m.get_inventory(dataset.resource_id, target) - except TarballNotFound as e: + except (TarballNotFound, CacheExtractBadPath) as e: raise APIAbort(HTTPStatus.NOT_FOUND, str(e)) if file_info["type"] != CacheType.FILE: diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index 17ad65bb51..50cbfb722a 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -1,6 +1,7 @@ from collections import deque from dataclasses import dataclass from enum import auto, Enum +import fcntl from logging import Logger from pathlib import Path import shlex @@ -193,14 +194,20 @@ def make_cache_object(dir_path: Path, path: Path) -> CacheObject: class Inventory: - """Encapsulate the file stream and subprocess.Popen object + """Encapsulate the file stream and cache lock management This encapsulation allows cleaner downstream handling, so that we can close - both the extracted file stream and the Popen object itself when we're done. - This eliminates interference with later operations. + both the extracted file stream and unlock the dataset cache after the file + reference is complete. In APIs the Inventory close is done by the Flask + infrastructure after the response handling is done. """ - def __init__(self, stream: IO[bytes], subproc: Optional[subprocess.Popen] = None): + def __init__( + self, + stream: IO[bytes], + lock: Optional[IO[bytes]] = None, + subproc: Optional[subprocess.Popen] = None, + ): """Construct an instance to track extracted inventory This encapsulates many byte stream operations so that it can be used @@ -208,10 +215,11 @@ def __init__(self, stream: IO[bytes], subproc: Optional[subprocess.Popen] = None Args: stream: the data stream of a specific tarball member - subproc: the subprocess producing the stream, if any + lock: the file reference for the cache lock file """ - self.subproc = subproc self.stream = stream + self.lock = lock + self.subproc = subproc def close(self): """Close the byte stream and clean up the Popen object""" @@ -236,11 +244,14 @@ def close(self): # Release our reference to the subprocess.Popen object so that the # object can be reclaimed. self.subproc = None - - # Explicitly close the stream, in case there never was an associated - # subprocess. (If there was an associated subprocess, the streams are - # now empty, and we'll assume that they are closed when the Popen - # object is reclaimed.) + if self.lock: + try: + fcntl.lockf(self.lock, fcntl.LOCK_UN) + self.lock.close() + finally: + # Release our reference to the lock file so that the + # object can be reclaimed. + self.lock = None self.stream.close() def getbuffer(self): @@ -261,7 +272,7 @@ def seek(self, *args, **kwargs) -> int: def __repr__(self) -> str: """Return a string representation""" - return f"" + return f"" def __iter__(self): """Allow iterating through lines in the buffer""" @@ -657,7 +668,7 @@ def extract(tarball_path: Path, path: str) -> Inventory: # containing the extracted file to our caller, and return the Popen # object so that we can clean it up when the Inventory object is closed. if not tarproc.returncode: - return Inventory(tarproc.stdout, tarproc) + return Inventory(tarproc.stdout, subproc=tarproc) # The tar command was invoked successfully (otherwise, the Popen() # constructor would have raised an exception), but it exited with @@ -675,6 +686,41 @@ def extract(tarball_path: Path, path: str) -> Inventory: tarball_path, f"Unexpected error from {tar_path}: {error_text!r}" ) + def stream(self, path: str) -> Inventory: + """Return a cached inventory file as a binary stream + + Args: + path: Relative path of a regular file within a tarball. + + Returns: + An Inventory object encapsulating the file stream and the cache + lock. The caller is reponsible for closing the Inventory! + """ + self.unpack() + + # NOTE: the cache is unlocked here. That's "probably OK" as the reclaim + # is based on the last modified timestamp of the lock file and only + # runs periodically. + # + # TODO: Can we convert the lock from EXCLUSIVE to SHARED? If so, that + # would be ideal. + + lock = None + try: + lock = (self.cache / "lock").open("rb", buffering=0) + fcntl.lockf(lock, fcntl.LOCK_SH) + artifact: Path = self.unpacked / path + self.controller.logger.info("path {}: stat {}", artifact, artifact.exists()) + if not artifact.is_file(): + raise CacheExtractBadPath(self.tarball_path, path) + (self.cache / "last_ref").touch(exist_ok=True) + return Inventory(artifact.open("rb"), lock=lock) + except Exception as e: + if lock: + fcntl.lockf(lock, fcntl.LOCK_UN) + lock.close() + raise e + def get_inventory(self, path: str) -> Optional[JSONOBJECT]: """Access the file stream of a tarball member file. @@ -688,12 +734,11 @@ def get_inventory(self, path: str) -> Optional[JSONOBJECT]: info = { "name": self.tarball_path.name, "type": CacheType.FILE, - "stream": Inventory(self.tarball_path.open("rb"), None), + "stream": Inventory(self.tarball_path.open("rb")), } else: - file_path = f"{self.name}/{path}" - stream = Tarball.extract(self.tarball_path, file_path) - info = {"name": file_path, "type": CacheType.FILE, "stream": stream} + stream = self.stream(path) + info = {"name": path, "type": CacheType.FILE, "stream": stream} return info @@ -776,23 +821,25 @@ def unpack(self): here and pass to the indexer (/srv/pbench/.cache/) and the actual unpacked root (/srv/pbench/.cache//). """ - self.cache.mkdir(parents=True) - try: - tar_command = "tar -x --no-same-owner --delay-directory-restore " - tar_command += f"--force-local --file='{str(self.tarball_path)}'" - self.subprocess_run( - tar_command, self.cache, TarballUnpackError, self.tarball_path - ) - - find_command = "find . ( -type d -exec chmod ugo+rx {} + ) -o ( -type f -exec chmod ugo+r {} + )" - self.subprocess_run( - find_command, self.cache, TarballModeChangeError, self.cache - ) - except Exception: - shutil.rmtree(self.cache, ignore_errors=True) - raise - self.unpacked = self.cache / self.name + if not self.cache.exists(): + self.cache.mkdir(exist_ok=True) + with (self.cache / "lock").open("wb", buffering=0) as lock: + fcntl.lockf(lock, fcntl.LOCK_EX) + try: + if not self.unpacked: + tar_command = "tar -x --no-same-owner --delay-directory-restore " + tar_command += f"--force-local --file='{str(self.tarball_path)}'" + self.subprocess_run( + tar_command, self.cache, TarballUnpackError, self.tarball_path + ) + find_command = "find . ( -type d -exec chmod ugo+rx {} + ) -o ( -type f -exec chmod ugo+r {} + )" + self.subprocess_run( + find_command, self.cache, TarballModeChangeError, self.cache + ) + self.unpacked = self.cache / self.name + finally: + fcntl.lockf(lock, fcntl.LOCK_UN) self.cache_map(self.unpacked) def uncache(self): @@ -943,24 +990,6 @@ def create_tarball(self, tarfile_path: Path) -> Tarball: self.tarballs[tarball.name] = tarball return tarball - def unpack(self, dataset_id: str): - """Unpack a tarball into a temporary cache directory. - - Args: - dataset_id: Resource ID of the dataset to unpack - """ - tarball = self.datasets[dataset_id] - tarball.unpack() - - def uncache(self, dataset_id: str): - """Remove the cached unpack directory. - - Args: - dataset_id: Resource ID of dataset to remove - """ - tarball = self.datasets[dataset_id] - tarball.uncache() - def delete(self, dataset_id: str): """Delete a dataset tarball and remove it from the controller @@ -1257,7 +1286,7 @@ def unpack(self, dataset_id: str) -> Tarball: The tarball object """ tarball = self.find_dataset(dataset_id) - tarball.controller.unpack(dataset_id) + tarball.unpack() return tarball def get_info(self, dataset_id: str, path: Path) -> dict[str, Any]: @@ -1293,16 +1322,6 @@ def get_inventory(self, dataset_id: str, target: str) -> Optional[JSONOBJECT]: tarball = self.find_dataset(dataset_id) return tarball.get_inventory(target) - def uncache(self, dataset_id: str): - """Remove the unpacked tarball tree. - - Args: - dataset_id: Dataset resource ID to "uncache" - """ - tarball = self.find_dataset(dataset_id) - controller = tarball.controller - controller.uncache(dataset_id) - def delete(self, dataset_id: str): """Delete the dataset as well as unpacked artifacts. diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index f432b6118d..5fe5676c63 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -1,4 +1,6 @@ +from contextlib import contextmanager import errno +import fcntl import hashlib import io from logging import Logger @@ -508,30 +510,35 @@ def __init__(self, path: Path, resource_id: str, controller: Controller): self.cache = controller.cache / "ABC" self.isolator = controller.path / resource_id self.unpacked = None + self.controller = controller def test_unpack_tar_subprocess_exception(self, make_logger, monkeypatch): """Show that, when unpacking of the Tarball fails and raises an Exception it is handled successfully.""" tar = Path("/mock/A.tar.xz") cache = Path("/mock/.cache") - rmtree_called = False - def mock_rmtree(path: Path, ignore_errors=False): - nonlocal rmtree_called - rmtree_called = True + locks: list[tuple[str, str]] = [] - assert ignore_errors - assert path == cache / "ABC" + @contextmanager + def open(s, m, buffering=-1): + yield None + + def locker(fd, mode): + nonlocal locks + locks.append((fd, mode)) + @staticmethod def mock_run(command, _dir_path, exception, dir_p): verb = "tar" assert command.startswith(verb) raise exception(dir_p, subprocess.TimeoutExpired(verb, 43)) with monkeypatch.context() as m: - m.setattr(Path, "mkdir", lambda path, parents: None) - m.setattr(Tarball, "subprocess_run", staticmethod(mock_run)) - m.setattr(shutil, "rmtree", mock_rmtree) + m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) + m.setattr(Tarball, "subprocess_run", mock_run) + m.setattr(Path, "open", open) + m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) tb = Tarball( @@ -542,7 +549,6 @@ def mock_run(command, _dir_path, exception, dir_p): msg = f"An error occurred while unpacking {tar}: Command 'tar' timed out after 43 seconds" assert str(exc.value) == msg assert exc.type == TarballUnpackError - assert rmtree_called def test_unpack_find_subprocess_exception(self, make_logger, monkeypatch): """Show that, when permission change of the Tarball fails and raises @@ -551,6 +557,16 @@ def test_unpack_find_subprocess_exception(self, make_logger, monkeypatch): cache = Path("/mock/.cache") rmtree_called = True + locks: list[tuple[str, str]] = [] + + @contextmanager + def open(s, m, buffering=-1): + yield None + + def locker(fd, mode): + nonlocal locks + locks.append((fd, mode)) + def mock_rmtree(path: Path, ignore_errors=False): nonlocal rmtree_called rmtree_called = True @@ -558,6 +574,7 @@ def mock_rmtree(path: Path, ignore_errors=False): assert ignore_errors assert path == cache / "ABC" + @staticmethod def mock_run(command, _dir_path, exception, dir_p): verb = "find" if command.startswith(verb): @@ -566,8 +583,10 @@ def mock_run(command, _dir_path, exception, dir_p): assert command.startswith("tar") with monkeypatch.context() as m: - m.setattr(Path, "mkdir", lambda path, parents: None) - m.setattr(Tarball, "subprocess_run", staticmethod(mock_run)) + m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) + m.setattr(Path, "open", open) + m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) + m.setattr(Tarball, "subprocess_run", mock_run) m.setattr(shutil, "rmtree", mock_rmtree) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) @@ -589,6 +608,16 @@ def test_unpack_success(self, make_logger, monkeypatch): cache = Path("/mock/.cache") call = list() + locks: list[tuple[str, str]] = [] + + @contextmanager + def open(s, m, buffering=-1): + yield None + + def locker(fd, mode): + nonlocal locks + locks.append((fd, mode)) + def mock_run(args, **_kwargs): call.append(args[0]) @@ -604,8 +633,10 @@ def mock_resolve(_path, _strict=False): raise AssertionError("Unexpected call to Path.resolve()") with monkeypatch.context() as m: - m.setattr(Path, "mkdir", lambda path, parents: None) - m.setattr(subprocess, "run", mock_run) + m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) + m.setattr(Path, "open", open) + m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) + m.setattr("pbench.server.cache_manager.subprocess.run", mock_run) m.setattr(Path, "resolve", mock_resolve) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) @@ -969,36 +1000,67 @@ def test_cache_map_get_info_cmap( assert file_info == expected_msg @pytest.mark.parametrize( - "file_path, exp_file_type, exp_stream", + "file_path,file_pattern,exp_stream", [ - ("", CacheType.FILE, io.BytesIO(b"tarball_as_a_byte_stream")), - (None, CacheType.FILE, io.BytesIO(b"tarball_as_a_byte_stream")), - ("f1.json", CacheType.FILE, io.BytesIO(b"file_as_a_byte_stream")), - ("subdir1/f12_sym", CacheType.FILE, io.BytesIO(b"file1_as_a_byte_stream")), + ("", "dir_name.tar.xz", io.BytesIO(b"tarball_as_a_byte_stream")), + (None, "dir_name.tar.xz", io.BytesIO(b"tarball_as_a_byte_stream")), + ("f1.json", "f1.json", io.BytesIO(b"file_as_a_byte_stream")), + ("subdir1/f12_sym", None, CacheExtractBadPath(Path("a"), "b")), ], ) def test_get_inventory( - self, make_logger, monkeypatch, tmp_path, file_path, exp_file_type, exp_stream + self, make_logger, monkeypatch, tmp_path, file_path, file_pattern, exp_stream ): """Test to extract file contents/stream from a file""" - tar = Path("/mock/dir_name.tar.xz") - cache = Path("/mock/.cache") + archive = tmp_path / "mock/archive" + tar = archive / "ABC/dir_name.tar.xz" + cache = tmp_path / "mock/.cache" + + locks: list[tuple[str, str]] = [] + + def locker(fd, mode): + nonlocal locks + locks.append((fd, mode)) with monkeypatch.context() as m: m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) - m.setattr(Tarball, "extract", lambda _t, _p: Inventory(exp_stream)) - m.setattr(Path, "open", lambda _s, _m="rb": exp_stream) + real_open = Path.open + m.setattr( + Path, + "open", + lambda s, m="rb", buffering=-1: exp_stream + if file_pattern and file_pattern in str(s) + else real_open(s, m, buffering), + ) + m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) tb = Tarball( - tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) + tar, "ABC", Controller(archive, cache, make_logger) ) + tb.unpacked = cache / "ABC/dir_name" tar_dir = TestCacheManager.MockController.generate_test_result_tree( - tmp_path, "dir_name" + cache / "ABC", "dir_name" ) tb.cache_map(tar_dir) - file_info = tb.get_inventory(file_path) - assert file_info["type"] == exp_file_type - assert file_info["stream"].stream == exp_stream + try: + file_info = tb.get_inventory(file_path) + except Exception as e: + assert isinstance(e, type(exp_stream)), e + else: + assert not isinstance(exp_stream, Exception) + assert file_info["type"] is CacheType.FILE + stream: Inventory = file_info["stream"] + assert stream.stream == exp_stream + stream.close() + if not file_path: + assert len(locks) == 0 + else: + assert list(i[1] for i in locks) == [ + fcntl.LOCK_EX, + fcntl.LOCK_UN, + fcntl.LOCK_SH, + fcntl.LOCK_UN, + ] def test_cm_inventory(self, monkeypatch, server_config, make_logger): """Verify the happy path of the high level get_inventory""" @@ -1221,9 +1283,9 @@ def seek(self, offset: int, _whence: int = io.SEEK_SET) -> int: return offset # Invoke the CUT - stream = Inventory(MockBufferedReader(), None) + stream = Inventory(MockBufferedReader()) - assert stream.subproc is None + assert stream.lock is None # Test Inventory.getbuffer() calls.clear() @@ -1418,7 +1480,7 @@ def read(self, size: int = -1) -> bytes: assert my_stream, "Test bug: we need at least one of stdout and stderr" # Invoke the CUT - stream = Inventory(my_stream, MockPopen(my_stdout, my_stderr)) + stream = Inventory(my_stream, subproc=MockPopen(my_stdout, my_stderr)) # Test Inventory.__repr__() assert str(stream) == " from MockPopen>" @@ -1430,7 +1492,7 @@ def read(self, size: int = -1) -> bytes: else: assert not wait_timeout, "wait() failed to time out as expected" - assert stream.subproc is None + assert stream.lock is None assert my_calls == exp_calls def test_find( diff --git a/lib/pbench/test/unit/server/test_datasets_inventory.py b/lib/pbench/test/unit/server/test_datasets_inventory.py index 100d9e0e1c..305ea8634c 100644 --- a/lib/pbench/test/unit/server/test_datasets_inventory.py +++ b/lib/pbench/test/unit/server/test_datasets_inventory.py @@ -102,7 +102,7 @@ def mock_get_inventory(_s, _d: str, _t: str): return { "name": "f1.json", "type": CacheType.FILE, - "stream": Inventory(exp_stream, None), + "stream": Inventory(exp_stream), } response = Response() @@ -142,7 +142,7 @@ def mock_get_inventory(_s, _d: str, _t: str): return { "name": "f1.json", "type": CacheType.FILE, - "stream": Inventory(exp_stream, None), + "stream": Inventory(exp_stream), } def mock_send_file(path_or_file, *args, **kwargs): @@ -164,7 +164,7 @@ def mock_get_inventory(_s, _d: str, _t: str): return { "name": "f1.json", "type": CacheType.FILE, - "stream": Inventory(exp_stream, None), + "stream": Inventory(exp_stream), } monkeypatch.setattr(CacheManager, "get_inventory", mock_get_inventory) diff --git a/lib/pbench/test/unit/server/test_datasets_visualize.py b/lib/pbench/test/unit/server/test_datasets_visualize.py index 5547010360..ba525e13f1 100644 --- a/lib/pbench/test/unit/server/test_datasets_visualize.py +++ b/lib/pbench/test/unit/server/test_datasets_visualize.py @@ -53,7 +53,7 @@ def mock_get_inventory(self, _dataset: str, target: str): return { "name": Path(target).name, "type": CacheType.FILE, - "stream": Inventory(BytesIO(b"CSV_file_as_a_byte_stream"), None), + "stream": Inventory(BytesIO(b"CSV_file_as_a_byte_stream")), } @staticmethod From 24424dad37cfc75b21c9e0ae315a21df27b05ee3 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Fri, 8 Sep 2023 00:58:48 -0400 Subject: [PATCH 02/14] Implement cache reclaim and timer service. --- lib/pbench/cli/server/tree_manage.py | 109 +++++++++++++++++- lib/pbench/server/cache_manager.py | 69 +++++++---- lib/pbench/server/indexing_tarballs.py | 3 - .../test/unit/server/test_cache_manager.py | 8 +- server/lib/systemd/pbench-reclaim.service | 14 +++ server/lib/systemd/pbench-reclaim.timer | 12 ++ server/pbenchinacan/container-build.sh | 3 + 7 files changed, 182 insertions(+), 36 deletions(-) create mode 100644 server/lib/systemd/pbench-reclaim.service create mode 100644 server/lib/systemd/pbench-reclaim.timer diff --git a/lib/pbench/cli/server/tree_manage.py b/lib/pbench/cli/server/tree_manage.py index e19d41ad68..15aebb9ec5 100644 --- a/lib/pbench/cli/server/tree_manage.py +++ b/lib/pbench/cli/server/tree_manage.py @@ -1,14 +1,92 @@ +from datetime import datetime, timedelta, timezone +import errno +import fcntl +from logging import Logger + import click from pbench.cli import pass_cli_context from pbench.cli.server import config_setup from pbench.cli.server.options import common_options from pbench.common.logger import get_pbench_logger -from pbench.server import BadConfig +from pbench.server import BadConfig, OperationCode from pbench.server.cache_manager import CacheManager +from pbench.server.database.models.audit import Audit, AuditStatus, AuditType + +# Length of time in hours to retain unreferenced cached results data. +# TODO: this could become a configurable setting? +CACHE_LIFETIME = 4.0 + + +def reclaim_cache(tree: CacheManager, logger: Logger, lifetime: float = CACHE_LIFETIME): + """Reclaim unused caches + + Args: + tree: the cache manager instance + lifetime: number of hours to retain unused cache data + logger: a Logger object + """ + window = datetime.now(timezone.utc) - timedelta(hours=lifetime) + for tarball in tree.datasets.values(): + if tarball.unpacked: + date = datetime.fromtimestamp( + tarball.last_ref.stat().st_mtime, timezone.utc + ) + if date < window: + logger.info( + "RECLAIM {}: last_ref {:%Y-%m-%d %H:%M:%S} is older than {:%Y-%m-%d %H:%M:%S}", + tarball.name, + date, + window, + ) + error = None + audit = None + with tarball.lock.open("wb") as lock: + try: + fcntl.lockf(lock, fcntl.LOCK_EX | fcntl.LOCK_NB) + audit = Audit.create( + name="reclaim", + operation=OperationCode.DELETE, + status=AuditStatus.BEGIN, + user_name=Audit.BACKGROUND_USER, + object_type=AuditType.DATASET, + object_id=tarball.resource_id, + object_name=tarball.name, + ) + try: + tarball.cache_delete() + except Exception as e: + error = e + finally: + fcntl.lockf(lock, fcntl.LOCK_UN) + except OSError as e: + if e.errno in (errno.EAGAIN, errno.EACCES): + logger.info( + "RECLAIM {}: skipping because cache is locked", + tarball.name, + ) + # If the cache is locked, regardless of age, then + # the last_ref timestamp is about to be updated, + # and we skip the cache for now. + continue + error = e + attributes = {"last_ref": f"{date:%Y-%m-%d %H:%M:%S}"} + if error: + logger.error("RECLAIM {} failed with '{}'", tarball.name, error) + attributes["error"] = str(error) + Audit.create( + root=audit, + status=AuditStatus.FAILURE if error else AuditStatus.SUCCESS, + attributes=attributes, + ) def print_tree(tree: CacheManager): + """Print basic information about the cache + + Args: + tree: a cache instance + """ print(f"Tree anchored at {tree.archive_root}\n") if len(tree.datasets) == 0 and len(tree.controllers) == 0: @@ -18,14 +96,19 @@ def print_tree(tree: CacheManager): print("Tarballs:") for tarball in tree.datasets.values(): print(f" {tarball.name}") + if tarball.unpacked: + date = datetime.fromtimestamp( + tarball.last_ref.stat().st_mtime, timezone.utc + ) + print( + f" Inventory is cached, last referenced {date:%Y-%m-%d %H:%M:%S}" + ) print("\nControllers:") for controller in tree.controllers.values(): print(f" Controller {controller.name}:") for tarball in controller.tarballs.values(): print(f" Tarball {tarball.name}") - if tarball.unpacked: - print(f" Unpacked in {tarball.unpacked}") @click.command(name="pbench-tree-manager") @@ -33,19 +116,31 @@ def print_tree(tree: CacheManager): @click.option( "--display", default=False, is_flag=True, help="Display the full tree on completion" ) +@click.option( + "--lifetime", + default=CACHE_LIFETIME, + type=click.FLOAT, + help="Specify lifetime of cached data for --reclaim", +) +@click.option( + "--reclaim", default=False, is_flag=True, help="Reclaim stale cached data" +) @common_options -def tree_manage(context: object, display: bool): +def tree_manage(context: object, display: bool, lifetime: int, reclaim: bool): """ Discover, display, and manipulate the on-disk representation of controllers and datasets. - This primarily exposes the CacheManager object hierarchy, and provides a simple - hierarchical display of controllers and datasets. + This primarily exposes the CacheManager object hierarchy, and provides a + hierarchical display of controllers and datasets. This also supports + reclaiming cached dataset files that haven't been referenced recently. \f Args: context: Click context (contains shared `--config` value) display: Print a simplified representation of the hierarchy + lifetime: Number of hours to retain unused cache before reclaim + reclaim: Reclaim stale cached data """ try: config = config_setup(context) @@ -54,6 +149,8 @@ def tree_manage(context: object, display: bool): cache_m.full_discovery() if display: print_tree(cache_m) + if reclaim: + reclaim_cache(cache_m, logger, lifetime) rv = 0 except Exception as exc: logger.exception("An error occurred discovering the file tree: {}", exc) diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index 50cbfb722a..b14c4a975f 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -344,6 +344,16 @@ def __init__(self, path: Path, resource_id: str, controller: "Controller"): # inactive. self.unpacked: Optional[Path] = None + # Record the lockf file path used to control cache access + self.lock: Path = self.cache / "lock" + + # Record a marker file path used to record the last cache access + # timestamp + self.last_ref: Path = self.cache / "last_ref" + + # Record the path of the companion MD5 file + self.md5_path: Path = path.with_suffix(".xz.md5") + # Record the name of the containing controller self.controller_name: str = controller.name @@ -696,7 +706,7 @@ def stream(self, path: str) -> Inventory: An Inventory object encapsulating the file stream and the cache lock. The caller is reponsible for closing the Inventory! """ - self.unpack() + self.cache_create() # NOTE: the cache is unlocked here. That's "probably OK" as the reclaim # is based on the last modified timestamp of the lock file and only @@ -707,13 +717,13 @@ def stream(self, path: str) -> Inventory: lock = None try: - lock = (self.cache / "lock").open("rb", buffering=0) + lock = self.lock.open("rb", buffering=0) fcntl.lockf(lock, fcntl.LOCK_SH) artifact: Path = self.unpacked / path self.controller.logger.info("path {}: stat {}", artifact, artifact.exists()) if not artifact.is_file(): raise CacheExtractBadPath(self.tarball_path, path) - (self.cache / "last_ref").touch(exist_ok=True) + self.last_ref.touch(exist_ok=True) return Inventory(artifact.open("rb"), lock=lock) except Exception as e: if lock: @@ -802,29 +812,22 @@ def subprocess_run( f"{cmd[0]} exited with status {process.returncode}: {process.stderr.strip()!r}", ) - def unpack(self): + def cache_create(self): """Unpack a tarball into a temporary directory tree Unpack the tarball into a temporary cache directory named with the tarball's resource_id (MD5). - This tree will be used for indexing, and then discarded. As we build - out more of the cache manager, it can also be used to build our initial - cache map. - - The indexer works off the unpacked data under CACHE, assuming the - tarball name in all paths (because this is what's inside the tarball). - Rather than passing the indexer the root `/srv/pbench/.cache` or trying - to update all of the indexer code (which still jumps back and forth - between the tarball and the unpacked files), we maintain the "cache" - directory as two paths: self.cache which is the directory we manage - here and pass to the indexer (/srv/pbench/.cache/) and - the actual unpacked root (/srv/pbench/.cache//). + Access to the cached directory is controlled by a "lockf" file to + ensure that two processes don't unpack at the same time and that we + can't reclaim a cache while it's being used. We also track a "last_ref" + modification timestamp which is used to reclaim old caches after + inactivity. """ if not self.cache.exists(): self.cache.mkdir(exist_ok=True) - with (self.cache / "lock").open("wb", buffering=0) as lock: + with self.lock.open("wb", buffering=0) as lock: fcntl.lockf(lock, fcntl.LOCK_EX) try: if not self.unpacked: @@ -839,18 +842,28 @@ def unpack(self): ) self.unpacked = self.cache / self.name finally: + self.last_ref.touch(exist_ok=True) fcntl.lockf(lock, fcntl.LOCK_UN) self.cache_map(self.unpacked) - def uncache(self): - """Remove the unpacked tarball directory and all contents.""" + def cache_delete(self): + """Remove the unpacked tarball directory and all contents. + + WARNING: + + This is unprotected! + + Normal cache reclaim is managed using the `tree_manage --reclaim` CLI + command, generally through the `pbench-reclaim.timer` service, which + calls this method only when the cache is unlocked and aged out. + """ self.cachemap = None if self.unpacked: try: - shutil.rmtree(self.cache) + shutil.rmtree(self.unpacked) self.unpacked = None except Exception as e: - self.logger.error("incoming remove for {} failed with {}", self.name, e) + self.logger.error("cache reclaim for {} failed with {}", self.name, e) raise def delete(self): @@ -859,7 +872,7 @@ def delete(self): We'll log errors in deletion, but "succeed" and clear the links to both files. There's nothing more we can do. """ - self.uncache() + self.cache_delete() if self.isolator and self.isolator.exists(): try: shutil.rmtree(self.isolator) @@ -1286,7 +1299,7 @@ def unpack(self, dataset_id: str) -> Tarball: The tarball object """ tarball = self.find_dataset(dataset_id) - tarball.unpack() + tarball.cache_create() return tarball def get_info(self, dataset_id: str, path: Path) -> dict[str, Any]: @@ -1322,6 +1335,16 @@ def get_inventory(self, dataset_id: str, target: str) -> Optional[JSONOBJECT]: tarball = self.find_dataset(dataset_id) return tarball.get_inventory(target) + def cache_reclaim(self, dataset_id: str): + """Remove the unpacked tarball tree. + + Args: + dataset_id: Dataset resource ID to "uncache" + """ + tarball = self.find_dataset(dataset_id) + tarball.cache_delete() + self._clean_empties(tarball.controller.name) + def delete(self, dataset_id: str): """Delete the dataset as well as unpacked artifacts. diff --git a/lib/pbench/server/indexing_tarballs.py b/lib/pbench/server/indexing_tarballs.py index 7d749b15ef..8bd1f20972 100644 --- a/lib/pbench/server/indexing_tarballs.py +++ b/lib/pbench/server/indexing_tarballs.py @@ -464,9 +464,6 @@ def sighup_handler(*args): ) tb_res = error_code["OP_ERROR" if failures > 0 else "OK"] finally: - # Remove the unpacked data - if tarobj: - tarobj.uncache() if tb_res.success: try: diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index 5fe5676c63..44fe3481a1 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -545,7 +545,7 @@ def mock_run(command, _dir_path, exception, dir_p): tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) ) with pytest.raises(TarballUnpackError) as exc: - tb.unpack() + tb.cache_create() msg = f"An error occurred while unpacking {tar}: Command 'tar' timed out after 43 seconds" assert str(exc.value) == msg assert exc.type == TarballUnpackError @@ -595,7 +595,7 @@ def mock_run(command, _dir_path, exception, dir_p): ) with pytest.raises(TarballModeChangeError) as exc: - tb.unpack() + tb.cache_create() msg = "An error occurred while changing file permissions of " msg += f"{cache / 'ABC'}: Command 'find' timed out after 43 seconds" assert str(exc.value) == msg @@ -643,7 +643,7 @@ def mock_resolve(_path, _strict=False): tb = Tarball( tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) ) - tb.unpack() + tb.cache_create() assert call == ["tar", "find"] assert tb.unpacked == cache / "ABC" / tb.name @@ -1649,7 +1649,7 @@ def test_lifecycle( # Remove the unpacked tarball, and confirm that the directory and link # are removed. - cm.uncache(md5) + cm.cache_reclaim(md5) assert not cache.exists() # Now that we have all that setup, delete the dataset diff --git a/server/lib/systemd/pbench-reclaim.service b/server/lib/systemd/pbench-reclaim.service new file mode 100644 index 0000000000..bc9b430d18 --- /dev/null +++ b/server/lib/systemd/pbench-reclaim.service @@ -0,0 +1,14 @@ +[Unit] +Description=Reclaim Pbench Server cache +Wants=pbench-reclaim.timer + +[Service] +Type = simple +User = pbench +Group = pbench +Environment = _PBENCH_SERVER_CONFIG=/opt/pbench-server/lib/config/pbench-server.cfg +ExecStart=-/opt/pbench-server/bin/pbench-tree-manage --reclaim +KillSignal = TERM + +[Install] +WantedBy=pbench-server.service diff --git a/server/lib/systemd/pbench-reclaim.timer b/server/lib/systemd/pbench-reclaim.timer new file mode 100644 index 0000000000..74480864cd --- /dev/null +++ b/server/lib/systemd/pbench-reclaim.timer @@ -0,0 +1,12 @@ +[Unit] +Description=Pbench Server cache reclaim timer +After=pbench-server.service +Requires=pbench-reclaim.service +BindsTo=pbench-server.service + +[Timer] +Unit=pbench-reclaim.service +OnUnitInactiveSec=4h + +[Install] +WantedBy=timers.target diff --git a/server/pbenchinacan/container-build.sh b/server/pbenchinacan/container-build.sh index e5ea3878e6..eebbd1e51b 100755 --- a/server/pbenchinacan/container-build.sh +++ b/server/pbenchinacan/container-build.sh @@ -79,12 +79,15 @@ buildah run $container cp \ ${SERVER_LIB}/systemd/pbench-server.service \ ${SERVER_LIB}/systemd/pbench-index.service \ ${SERVER_LIB}/systemd/pbench-index.timer \ + ${SERVER_LIB}/systemd/pbench-reclaim.service \ + ${SERVER_LIB}/systemd/pbench-reclaim.timer \ /etc/systemd/system/ buildah run $container systemctl enable nginx buildah run $container systemctl enable rsyslog buildah run $container systemctl enable pbench-server buildah run $container systemctl enable pbench-index.timer +buildah run $container systemctl enable pbench-reclaim.timer # Create the container image. buildah commit --rm $container ${PB_CONTAINER_REG}/${PB_SERVER_IMAGE_NAME}:${PB_SERVER_IMAGE_TAG} From a773b2ea941c15271fca956cb0b5a875d5a4893d Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Fri, 8 Sep 2023 09:44:52 -0400 Subject: [PATCH 03/14] More tweaks I've verified that the timer service removes sufficiently old cache data and that the data is unpacked again on request. The reclaim operation is audited. I should probably audit the unpack a well, but haven't done that here. I'm still hoping for a successful CI run to check cobertura coverage. --- lib/pbench/cli/server/tree_manage.py | 19 ++++++++++++--- .../test/unit/server/test_cache_manager.py | 23 +++++++++++-------- 2 files changed, 30 insertions(+), 12 deletions(-) diff --git a/lib/pbench/cli/server/tree_manage.py b/lib/pbench/cli/server/tree_manage.py index 15aebb9ec5..229154a53b 100644 --- a/lib/pbench/cli/server/tree_manage.py +++ b/lib/pbench/cli/server/tree_manage.py @@ -27,8 +27,14 @@ def reclaim_cache(tree: CacheManager, logger: Logger, lifetime: float = CACHE_LI logger: a Logger object """ window = datetime.now(timezone.utc) - timedelta(hours=lifetime) + total_count = 0 + has_cache = 0 + reclaimed = 0 + reclaim_failed = 0 for tarball in tree.datasets.values(): + total_count += 1 if tarball.unpacked: + has_cache += 1 date = datetime.fromtimestamp( tarball.last_ref.stat().st_mtime, timezone.utc ) @@ -55,6 +61,7 @@ def reclaim_cache(tree: CacheManager, logger: Logger, lifetime: float = CACHE_LI ) try: tarball.cache_delete() + reclaimed += 1 except Exception as e: error = e finally: @@ -72,6 +79,7 @@ def reclaim_cache(tree: CacheManager, logger: Logger, lifetime: float = CACHE_LI error = e attributes = {"last_ref": f"{date:%Y-%m-%d %H:%M:%S}"} if error: + reclaim_failed += 1 logger.error("RECLAIM {} failed with '{}'", tarball.name, error) attributes["error"] = str(error) Audit.create( @@ -79,6 +87,13 @@ def reclaim_cache(tree: CacheManager, logger: Logger, lifetime: float = CACHE_LI status=AuditStatus.FAILURE if error else AuditStatus.SUCCESS, attributes=attributes, ) + logger.info( + "RECLAIM summary: {} datasets, {} had cache: {} reclaimed and {} errors", + total_count, + has_cache, + reclaimed, + reclaim_failed, + ) def print_tree(tree: CacheManager): @@ -100,9 +115,7 @@ def print_tree(tree: CacheManager): date = datetime.fromtimestamp( tarball.last_ref.stat().st_mtime, timezone.utc ) - print( - f" Inventory is cached, last referenced {date:%Y-%m-%d %H:%M:%S}" - ) + print(f" Inventory is cached, last referenced {date:%Y-%m-%d %H:%M:%S}") print("\nControllers:") for controller in tree.controllers.values(): diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index 44fe3481a1..16bce6944a 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -509,6 +509,8 @@ def __init__(self, path: Path, resource_id: str, controller: Controller): self.tarball_path = path self.cache = controller.cache / "ABC" self.isolator = controller.path / resource_id + self.lock = self.cache / "lock" + self.last_ref = self.cache / "last_ref" self.unpacked = None self.controller = controller @@ -536,8 +538,9 @@ def mock_run(command, _dir_path, exception, dir_p): with monkeypatch.context() as m: m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) - m.setattr(Tarball, "subprocess_run", mock_run) m.setattr(Path, "open", open) + m.setattr(Path, "touch", lambda path, exist_ok=False: None) + m.setattr(Tarball, "subprocess_run", mock_run) m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) @@ -585,6 +588,7 @@ def mock_run(command, _dir_path, exception, dir_p): with monkeypatch.context() as m: m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) m.setattr(Path, "open", open) + m.setattr(Path, "touch", lambda path, exist_ok=False: None) m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) m.setattr(Tarball, "subprocess_run", mock_run) m.setattr(shutil, "rmtree", mock_rmtree) @@ -635,6 +639,7 @@ def mock_resolve(_path, _strict=False): with monkeypatch.context() as m: m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) m.setattr(Path, "open", open) + m.setattr(Path, "touch", lambda path, exist_ok=False: None) m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) m.setattr("pbench.server.cache_manager.subprocess.run", mock_run) m.setattr(Path, "resolve", mock_resolve) @@ -1579,11 +1584,13 @@ def test_lifecycle( cm = CacheManager(server_config, make_logger) archive = cm.archive_root / "ABC" cache = cm.cache_root / md5 + dataset_name = source_tarball.name[:-7] + unpack = cache / dataset_name monkeypatch.setattr(Tarball, "_get_metadata", fake_get_metadata) # None of the controller directories should exist yet assert not archive.exists() - assert not cache.exists() + assert not unpack.exists() # Create a dataset in the cache manager from our source tarball cm.create(source_tarball) @@ -1591,7 +1598,7 @@ def test_lifecycle( # Expect the archive directory was created, but we haven't unpacked so # incoming and results should not exist. assert archive.exists() - assert not cache.exists() + assert not unpack.exists() # The original files should have been removed assert not source_tarball.exists() @@ -1608,7 +1615,6 @@ def test_lifecycle( assert md5 == md5_hash.hexdigest() assert list(cm.controllers.keys()) == ["ABC"] - dataset_name = source_tarball.name[:-7] assert list(cm.tarballs) == [dataset_name] assert list(cm.datasets) == [md5] @@ -1617,9 +1623,9 @@ def test_lifecycle( cm.unpack(md5) assert cache == cm[md5].cache assert cache.is_dir() - assert (cache / dataset_name).is_dir() + assert unpack.is_dir() - assert cm.datasets[md5].unpacked == cache / dataset_name + assert cm.datasets[md5].unpacked == unpack # Re-discover, with all the files in place, and compare newcm = CacheManager(server_config, make_logger) @@ -1647,10 +1653,9 @@ def test_lifecycle( assert tarball.cache == other.cache assert tarball.unpacked == other.unpacked - # Remove the unpacked tarball, and confirm that the directory and link - # are removed. + # Remove the unpacked tarball, and confirm the directory is removed cm.cache_reclaim(md5) - assert not cache.exists() + assert not unpack.exists() # Now that we have all that setup, delete the dataset cm.delete(md5) From c40c844c53e4168770c32aabac691a9f31737b57 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Fri, 8 Sep 2023 13:22:44 -0400 Subject: [PATCH 04/14] Audit unpack We probably won't want to audit cache load longer term, but right now it probably makes sense to keep track. --- lib/pbench/server/cache_manager.py | 23 ++++++++++- .../test/unit/server/test_cache_manager.py | 40 ++++++++++++++++--- 2 files changed, 56 insertions(+), 7 deletions(-) diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index b14c4a975f..3348735e11 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -11,7 +11,8 @@ from typing import Any, IO, Optional, Union from pbench.common import MetadataLog, selinux -from pbench.server import JSONOBJECT, PathLike, PbenchServerConfig +from pbench.server import JSONOBJECT, OperationCode, PathLike, PbenchServerConfig +from pbench.server.database.models.audit import Audit, AuditStatus, AuditType from pbench.server.database.models.datasets import Dataset from pbench.server.utils import get_tarball_md5 @@ -829,8 +830,19 @@ def cache_create(self): self.cache.mkdir(exist_ok=True) with self.lock.open("wb", buffering=0) as lock: fcntl.lockf(lock, fcntl.LOCK_EX) + audit = None + error = None try: if not self.unpacked: + audit = Audit.create( + name="cache", + operation=OperationCode.CREATE, + status=AuditStatus.BEGIN, + user_name=Audit.BACKGROUND_USER, + object_type=AuditType.DATASET, + object_id=self.resource_id, + object_name=self.name, + ) tar_command = "tar -x --no-same-owner --delay-directory-restore " tar_command += f"--force-local --file='{str(self.tarball_path)}'" self.subprocess_run( @@ -841,9 +853,18 @@ def cache_create(self): find_command, self.cache, TarballModeChangeError, self.cache ) self.unpacked = self.cache / self.name + except Exception as e: + error = str(e) + raise finally: self.last_ref.touch(exist_ok=True) fcntl.lockf(lock, fcntl.LOCK_UN) + if audit: + Audit.create( + root=audit, + status=AuditStatus.FAILURE if error else AuditStatus.SUCCESS, + attributes={"error": error}, + ) self.cache_map(self.unpacked) def cache_delete(self): diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index 16bce6944a..8b7438a910 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -13,7 +13,7 @@ import pytest -from pbench.server import JSONOBJECT +from pbench.server import JSONOBJECT, OperationCode from pbench.server.cache_manager import ( BadDirpath, BadFilename, @@ -29,6 +29,7 @@ TarballNotFound, TarballUnpackError, ) +from pbench.server.database.models.audit import Audit, AuditStatus, AuditType from pbench.server.database.models.datasets import Dataset, DatasetBadName from pbench.test.unit.server.conftest import make_tarball @@ -506,15 +507,18 @@ def generate_test_result_tree(tmp_path: Path, dir_name: str) -> Path: class MockTarball: def __init__(self, path: Path, resource_id: str, controller: Controller): self.name = Dataset.stem(path) + self.resource_id = "ABC" self.tarball_path = path - self.cache = controller.cache / "ABC" + self.cache = controller.cache / self.resource_id self.isolator = controller.path / resource_id self.lock = self.cache / "lock" self.last_ref = self.cache / "last_ref" self.unpacked = None self.controller = controller - def test_unpack_tar_subprocess_exception(self, make_logger, monkeypatch): + def test_unpack_tar_subprocess_exception( + self, make_logger, db_session, monkeypatch + ): """Show that, when unpacking of the Tarball fails and raises an Exception it is handled successfully.""" tar = Path("/mock/A.tar.xz") @@ -553,7 +557,9 @@ def mock_run(command, _dir_path, exception, dir_p): assert str(exc.value) == msg assert exc.type == TarballUnpackError - def test_unpack_find_subprocess_exception(self, make_logger, monkeypatch): + def test_unpack_find_subprocess_exception( + self, make_logger, db_session, monkeypatch + ): """Show that, when permission change of the Tarball fails and raises an Exception it is handled successfully.""" tar = Path("/mock/A.tar.xz") @@ -606,7 +612,7 @@ def mock_run(command, _dir_path, exception, dir_p): assert exc.type == TarballModeChangeError assert rmtree_called - def test_unpack_success(self, make_logger, monkeypatch): + def test_unpack_success(self, make_logger, db_session, monkeypatch): """Test to check the unpacking functionality of the CacheManager""" tar = Path("/mock/A.tar.xz") cache = Path("/mock/.cache") @@ -1501,7 +1507,13 @@ def read(self, size: int = -1) -> bytes: assert my_calls == exp_calls def test_find( - self, selinux_enabled, server_config, make_logger, tarball, monkeypatch + self, + selinux_enabled, + server_config, + make_logger, + tarball, + monkeypatch, + db_session, ): """ Create a dataset, check the cache manager state, and test that we can find it @@ -1553,6 +1565,22 @@ def test_find( cm.unpack(md5) assert tarball.cache == controller.cache / md5 assert tarball.unpacked == controller.cache / md5 / tarball.name + assert tarball.last_ref.exists() + + audits = Audit.query(name="cache") + assert len(audits) == 2 + assert audits[0].name == "cache" + assert audits[0].operation is OperationCode.CREATE + assert audits[0].status is AuditStatus.BEGIN + assert audits[0].object_type is AuditType.DATASET + assert audits[0].object_id == tarball.resource_id + assert audits[0].object_name == tarball.name + assert audits[1].name == "cache" + assert audits[1].operation is OperationCode.CREATE + assert audits[1].status is AuditStatus.SUCCESS + assert audits[1].object_type is AuditType.DATASET + assert audits[1].object_id == tarball.resource_id + assert audits[1].object_name == tarball.name # We should be able to find the tarball even in a new cache manager # that hasn't been fully discovered. From 4d37469becac4c0131d7c55dcd78325dd854856b Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Tue, 12 Sep 2023 20:25:43 -0400 Subject: [PATCH 05/14] Encapsulate and package locking Allow holding lock from unpack to stream, and conversion between `EX` and `SH` lock modes. --- lib/pbench/cli/server/tree_manage.py | 92 ++++--- lib/pbench/server/cache_manager.py | 247 ++++++++++-------- lib/pbench/server/indexing_tarballs.py | 23 +- .../test/unit/server/test_cache_manager.py | 116 +++----- .../unit/server/test_indexing_tarballs.py | 56 +++- 5 files changed, 288 insertions(+), 246 deletions(-) diff --git a/lib/pbench/cli/server/tree_manage.py b/lib/pbench/cli/server/tree_manage.py index 229154a53b..eef74f66ee 100644 --- a/lib/pbench/cli/server/tree_manage.py +++ b/lib/pbench/cli/server/tree_manage.py @@ -1,6 +1,5 @@ from datetime import datetime, timedelta, timezone import errno -import fcntl from logging import Logger import click @@ -10,7 +9,7 @@ from pbench.cli.server.options import common_options from pbench.common.logger import get_pbench_logger from pbench.server import BadConfig, OperationCode -from pbench.server.cache_manager import CacheManager +from pbench.server.cache_manager import CacheManager, LockRef from pbench.server.database.models.audit import Audit, AuditStatus, AuditType # Length of time in hours to retain unreferenced cached results data. @@ -38,18 +37,19 @@ def reclaim_cache(tree: CacheManager, logger: Logger, lifetime: float = CACHE_LI date = datetime.fromtimestamp( tarball.last_ref.stat().st_mtime, timezone.utc ) - if date < window: - logger.info( - "RECLAIM {}: last_ref {:%Y-%m-%d %H:%M:%S} is older than {:%Y-%m-%d %H:%M:%S}", - tarball.name, - date, - window, - ) - error = None - audit = None - with tarball.lock.open("wb") as lock: + if date >= window: + continue + error = None + audit = None + logger.info( + "RECLAIM {}: last_ref {:%Y-%m-%d %H:%M:%S} is older than {:%Y-%m-%d %H:%M:%S}", + tarball.name, + date, + window, + ) + try: + with LockRef(tarball.lock, exclusive=True, wait=False): try: - fcntl.lockf(lock, fcntl.LOCK_EX | fcntl.LOCK_NB) audit = Audit.create( name="reclaim", operation=OperationCode.DELETE, @@ -59,29 +59,36 @@ def reclaim_cache(tree: CacheManager, logger: Logger, lifetime: float = CACHE_LI object_id=tarball.resource_id, object_name=tarball.name, ) - try: - tarball.cache_delete() - reclaimed += 1 - except Exception as e: - error = e - finally: - fcntl.lockf(lock, fcntl.LOCK_UN) - except OSError as e: - if e.errno in (errno.EAGAIN, errno.EACCES): - logger.info( - "RECLAIM {}: skipping because cache is locked", - tarball.name, - ) - # If the cache is locked, regardless of age, then - # the last_ref timestamp is about to be updated, - # and we skip the cache for now. - continue + except Exception as e: + logger.warn( + "Unable to audit cache reclaim for {}: '{}'", + tarball.name, + e, + ) + try: + tarball.cache_delete() + reclaimed += 1 + except Exception as e: error = e - attributes = {"last_ref": f"{date:%Y-%m-%d %H:%M:%S}"} - if error: - reclaim_failed += 1 - logger.error("RECLAIM {} failed with '{}'", tarball.name, error) - attributes["error"] = str(error) + except OSError as e: + if e.errno in (errno.EAGAIN, errno.EACCES): + logger.info( + "RECLAIM {}: skipping because cache is locked", + tarball.name, + ) + # If the cache is locked, regardless of age, then + # the last_ref timestamp is about to be updated, + # and we skip the dataset this time around. + continue + error = e + except Exception as e: + error = e + attributes = {"last_ref": f"{date:%Y-%m-%d %H:%M:%S}"} + if error: + reclaim_failed += 1 + logger.error("RECLAIM {} failed with '{}'", tarball.name, error) + attributes["error"] = str(error) + if audit: Audit.create( root=audit, status=AuditStatus.FAILURE if error else AuditStatus.SUCCESS, @@ -130,16 +137,15 @@ def print_tree(tree: CacheManager): "--display", default=False, is_flag=True, help="Display the full tree on completion" ) @click.option( - "--lifetime", - default=CACHE_LIFETIME, + "--reclaim", + show_default=True, + is_flag=False, + flag_value=CACHE_LIFETIME, type=click.FLOAT, - help="Specify lifetime of cached data for --reclaim", -) -@click.option( - "--reclaim", default=False, is_flag=True, help="Reclaim stale cached data" + help="Reclaim cached data older than hours", ) @common_options -def tree_manage(context: object, display: bool, lifetime: int, reclaim: bool): +def tree_manage(context: object, display: bool, reclaim: float): """ Discover, display, and manipulate the on-disk representation of controllers and datasets. @@ -163,7 +169,7 @@ def tree_manage(context: object, display: bool, lifetime: int, reclaim: bool): if display: print_tree(cache_m) if reclaim: - reclaim_cache(cache_m, logger, lifetime) + reclaim_cache(cache_m, logger, reclaim) rv = 0 except Exception as exc: logger.exception("An error occurred discovering the file tree: {}", exc) diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index 3348735e11..dc2cfdc2ef 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -20,93 +20,80 @@ class CacheManagerError(Exception): """Base class for exceptions raised from this module.""" - def __str__(self) -> str: - return "Generic cache manager exception" + pass class BadDirpath(CacheManagerError): """A bad directory path was given.""" def __init__(self, error_msg: str): + super().__init__(error_msg) self.error_msg = error_msg - def __str__(self) -> str: - return self.error_msg - class BadFilename(CacheManagerError): """A bad tarball path was given.""" def __init__(self, path: PathLike): + super().__init__(f"The file path {path} is not a tarball") self.path = str(path) - def __str__(self) -> str: - return f"The file path {self.path!r} is not a tarball" - class CacheExtractBadPath(CacheManagerError): """Request to extract a path that's bad or not a file""" def __init__(self, tar_name: Path, path: PathLike): + super().__init__(f"Unable to extract {path} from {tar_name.name}") self.name = tar_name.name self.path = str(path) - def __str__(self) -> str: - return f"Unable to extract {self.path} from {self.name}" - class TarballNotFound(CacheManagerError): """The dataset was not found in the ARCHIVE tree.""" def __init__(self, tarball: str): + super().__init__(f"The dataset tarball named {tarball!r} is not found") self.tarball = tarball - def __str__(self) -> str: - return f"The dataset tarball named {self.tarball!r} is not found" - class DuplicateTarball(CacheManagerError): """A duplicate tarball name was detected.""" def __init__(self, tarball: str): + super().__init__(f"A dataset tarball named {tarball!r} is already present") self.tarball = tarball - def __str__(self) -> str: - return f"A dataset tarball named {self.tarball!r} is already present" - class MetadataError(CacheManagerError): """A problem was found processing a tarball's metadata.log file.""" def __init__(self, tarball: Path, error: Exception): + super().__init__( + f"A problem occurred processing metadata.log from {tarball!s}: {str(error)!r}" + ) self.tarball = tarball self.error = str(error) - def __str__(self) -> str: - return f"A problem occurred processing metadata.log from {self.tarball!s}: {self.error!r}" - class TarballUnpackError(CacheManagerError): """An error occurred trying to unpack a tarball.""" def __init__(self, tarball: Path, error: str): + super().__init__(f"An error occurred while unpacking {tarball}: {error}") self.tarball = tarball self.error = error - def __str__(self) -> str: - return f"An error occurred while unpacking {self.tarball}: {self.error}" - class TarballModeChangeError(CacheManagerError): """An error occurred trying to fix unpacked tarball permissions.""" def __init__(self, tarball: Path, error: str): + super().__init__( + f"An error occurred while changing file permissions of {tarball}: {error}" + ) self.tarball = tarball self.error = error - def __str__(self) -> str: - return f"An error occurred while changing file permissions of {self.tarball}: {self.error}" - class CacheType(Enum): FILE = auto() @@ -194,6 +181,82 @@ def make_cache_object(dir_path: Path, path: Path) -> CacheObject: ) +class LockRef: + """Keep track of a cache lock passed off to a caller""" + + def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): + """Initialize a lock reference + + The lock file is opened in "w+" mode, which is "write update": unlike + "r+", this creates the file if it doesn't already exist, but still + allows lock conversion between LOCK_EX and LOCK_SH. + + Args: + lock: the path of a lock file + exclusive: lock for exclusive access + wait: wait for lock + """ + self.lock = lock.open("w+") + self.locked = False + self.exclusive = exclusive + self.unlock = True + self.wait = wait + + def acquire(self) -> "LockRef": + """Acquire the lock""" + + cmd = fcntl.LOCK_EX if self.exclusive else fcntl.LOCK_SH + if not self.wait: + cmd |= fcntl.LOCK_NB + fcntl.lockf(self.lock, cmd) + self.locked = True + return self + + def release(self): + """Release the lock and close the lock file""" + + exception = None + try: + fcntl.lockf(self.lock, fcntl.LOCK_UN) + self.lock.close() + self.locked = False + self.exclusive = False + except Exception as e: + exception = e + finally: + # Release our reference to the lock file so that the + # object can be reclaimed. + self.lock = None + if exception: + raise exception + + def upgrade(self): + """Upgrade a shared lock to exclusive""" + if not self.exclusive: + fcntl.lockf(self.lock, fcntl.LOCK_EX) + self.exclusive = True + + def downgrade(self): + """Downgrade an exclusive lock to shared""" + if self.exclusive: + fcntl.lockf(self.lock, fcntl.LOCK_SH) + self.exclusive = False + + def keep(self) -> "LockRef": + """Tell the context manager not to unlock on exit""" + self.unlock = False + return self + + def __enter__(self) -> "LockRef": + """Enter a lock context manager by acquiring the lock""" + return self.acquire() + + def __exit__(self, *exc): + """Exit a lock context manager by releasing the lock""" + if self.unlock: + self.release() + + class Inventory: """Encapsulate the file stream and cache lock management @@ -206,7 +269,7 @@ class Inventory: def __init__( self, stream: IO[bytes], - lock: Optional[IO[bytes]] = None, + lock: Optional[LockRef] = None, subproc: Optional[subprocess.Popen] = None, ): """Construct an instance to track extracted inventory @@ -216,7 +279,8 @@ def __init__( Args: stream: the data stream of a specific tarball member - lock: the file reference for the cache lock file + lock: a cache lock reference + subproc: a Popen object to clean up on close """ self.stream = stream self.lock = lock @@ -224,6 +288,8 @@ def __init__( def close(self): """Close the byte stream and clean up the Popen object""" + + exception = None if self.subproc: try: if self.subproc.poll() is None: @@ -247,13 +313,12 @@ def close(self): self.subproc = None if self.lock: try: - fcntl.lockf(self.lock, fcntl.LOCK_UN) - self.lock.close() - finally: - # Release our reference to the lock file so that the - # object can be reclaimed. - self.lock = None + self.lock.release() + except Exception as e: + exception = e self.stream.close() + if exception: + raise exception def getbuffer(self): """Return the underlying byte buffer (used by send_file)""" @@ -337,6 +402,9 @@ def __init__(self, path: Path, resource_id: str, controller: "Controller"): # Record where cached unpacked data would live self.cache: Path = controller.cache / self.resource_id + # We need the directory to lock, so make sure it's there + self.cache.mkdir(parents=True, exist_ok=True) + # Record hierarchy of a Tar ball self.cachemap: Optional[CacheMap] = None @@ -707,30 +775,18 @@ def stream(self, path: str) -> Inventory: An Inventory object encapsulating the file stream and the cache lock. The caller is reponsible for closing the Inventory! """ - self.cache_create() - # NOTE: the cache is unlocked here. That's "probably OK" as the reclaim - # is based on the last modified timestamp of the lock file and only - # runs periodically. - # - # TODO: Can we convert the lock from EXCLUSIVE to SHARED? If so, that - # would be ideal. + with LockRef(self.lock) as lock: + if not self.unpacked: + lock.upgrade() + self.cache_create() + lock.downgrade() - lock = None - try: - lock = self.lock.open("rb", buffering=0) - fcntl.lockf(lock, fcntl.LOCK_SH) artifact: Path = self.unpacked / path - self.controller.logger.info("path {}: stat {}", artifact, artifact.exists()) if not artifact.is_file(): raise CacheExtractBadPath(self.tarball_path, path) self.last_ref.touch(exist_ok=True) - return Inventory(artifact.open("rb"), lock=lock) - except Exception as e: - if lock: - fcntl.lockf(lock, fcntl.LOCK_UN) - lock.close() - raise e + return Inventory(artifact.open("rb"), lock=lock.keep()) def get_inventory(self, path: str) -> Optional[JSONOBJECT]: """Access the file stream of a tarball member file. @@ -819,53 +875,51 @@ def cache_create(self): Unpack the tarball into a temporary cache directory named with the tarball's resource_id (MD5). - Access to the cached directory is controlled by a "lockf" file to - ensure that two processes don't unpack at the same time and that we - can't reclaim a cache while it's being used. We also track a "last_ref" - modification timestamp which is used to reclaim old caches after - inactivity. + This method must be called while holding an exclusive cache lock. """ - if not self.cache.exists(): - self.cache.mkdir(exist_ok=True) - with self.lock.open("wb", buffering=0) as lock: - fcntl.lockf(lock, fcntl.LOCK_EX) + if not self.unpacked: audit = None error = None try: - if not self.unpacked: - audit = Audit.create( - name="cache", - operation=OperationCode.CREATE, - status=AuditStatus.BEGIN, - user_name=Audit.BACKGROUND_USER, - object_type=AuditType.DATASET, - object_id=self.resource_id, - object_name=self.name, - ) - tar_command = "tar -x --no-same-owner --delay-directory-restore " - tar_command += f"--force-local --file='{str(self.tarball_path)}'" - self.subprocess_run( - tar_command, self.cache, TarballUnpackError, self.tarball_path - ) - find_command = "find . ( -type d -exec chmod ugo+rx {} + ) -o ( -type f -exec chmod ugo+r {} + )" - self.subprocess_run( - find_command, self.cache, TarballModeChangeError, self.cache - ) - self.unpacked = self.cache / self.name + audit = Audit.create( + name="cache", + operation=OperationCode.CREATE, + status=AuditStatus.BEGIN, + user_name=Audit.BACKGROUND_USER, + object_type=AuditType.DATASET, + object_id=self.resource_id, + object_name=self.name, + ) + except Exception as e: + self.controller.logger.warning( + "Unable to audit unpack for {}: '{}'", self.name, e + ) + + try: + tar_command = "tar -x --no-same-owner --delay-directory-restore " + tar_command += f"--force-local --file='{str(self.tarball_path)}'" + self.subprocess_run( + tar_command, self.cache, TarballUnpackError, self.tarball_path + ) + find_command = "find . ( -type d -exec chmod ugo+rx {} + ) -o ( -type f -exec chmod ugo+r {} + )" + self.subprocess_run( + find_command, self.cache, TarballModeChangeError, self.cache + ) + self.unpacked = self.cache / self.name + self.last_ref.touch(exist_ok=True) + self.cache_map(self.unpacked) except Exception as e: error = str(e) raise finally: - self.last_ref.touch(exist_ok=True) - fcntl.lockf(lock, fcntl.LOCK_UN) if audit: + attributes = {"error": error} if error else {} Audit.create( root=audit, status=AuditStatus.FAILURE if error else AuditStatus.SUCCESS, - attributes={"error": error}, + attributes=attributes, ) - self.cache_map(self.unpacked) def cache_delete(self): """Remove the unpacked tarball directory and all contents. @@ -1310,19 +1364,6 @@ def create(self, tarfile_path: Path) -> Tarball: self.datasets[tarball.resource_id] = tarball return tarball - def unpack(self, dataset_id: str) -> Tarball: - """Unpack a tarball into the CACHE tree - - Args: - dataset_id: Dataset resource ID - - Returns: - The tarball object - """ - tarball = self.find_dataset(dataset_id) - tarball.cache_create() - return tarball - def get_info(self, dataset_id: str, path: Path) -> dict[str, Any]: """Get information about dataset files from the cache map @@ -1356,16 +1397,6 @@ def get_inventory(self, dataset_id: str, target: str) -> Optional[JSONOBJECT]: tarball = self.find_dataset(dataset_id) return tarball.get_inventory(target) - def cache_reclaim(self, dataset_id: str): - """Remove the unpacked tarball tree. - - Args: - dataset_id: Dataset resource ID to "uncache" - """ - tarball = self.find_dataset(dataset_id) - tarball.cache_delete() - self._clean_empties(tarball.controller.name) - def delete(self, dataset_id: str): """Delete the dataset as well as unpacked artifacts. diff --git a/lib/pbench/server/indexing_tarballs.py b/lib/pbench/server/indexing_tarballs.py index 8bd1f20972..d58f72f4c0 100644 --- a/lib/pbench/server/indexing_tarballs.py +++ b/lib/pbench/server/indexing_tarballs.py @@ -15,7 +15,7 @@ UnsupportedTarballFormat, ) from pbench.server import OperationCode, tstos -from pbench.server.cache_manager import CacheManager, Tarball, TarballNotFound +from pbench.server.cache_manager import CacheManager, LockRef, Tarball from pbench.server.database.models.audit import Audit, AuditStatus from pbench.server.database.models.datasets import ( Dataset, @@ -363,20 +363,23 @@ def sighup_handler(*args): ptb = None tarobj: Optional[Tarball] = None tb_res = error_code["OK"] + lock = None try: - # Dynamically unpack the tarball for indexing. + # We need the fully unpacked cache tree to index it try: - tarobj = self.cache_manager.unpack(dataset.resource_id) - if not tarobj.unpacked: - idxctx.logger.warning( - "{} has not been unpacked", dataset - ) - continue - except TarballNotFound as e: + tarobj = self.cache_manager.find_dataset( + dataset.resource_id + ) + lock = LockRef(tarobj.lock, exclusive=True).acquire() + tarobj.cache_create() + lock.downgrade() + except Exception as e: self.sync.error( dataset, f"Unable to unpack dataset: {e!s}", ) + if lock: + lock.release() continue audit = Audit.create( @@ -510,6 +513,8 @@ def sighup_handler(*args): Audit.create( root=audit, status=doneness, attributes=attributes ) + if lock: + lock.release() try: ie_len = ie_filepath.stat().st_size except FileNotFoundError: diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index 8b7438a910..8216da917a 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -1,6 +1,4 @@ -from contextlib import contextmanager import errno -import fcntl import hashlib import io from logging import Logger @@ -474,11 +472,15 @@ def generate_test_result_tree(tmp_path: Path, dir_name: str) -> Path: # create some directories and files inside the temp directory sub_dir = tmp_path / dir_name sub_dir.mkdir(parents=True, exist_ok=True) - (sub_dir / "f1.json").touch() - (sub_dir / "metadata.log").touch() + (sub_dir / "f1.json").write_text("{'json': 'value'}", encoding="utf-8") + (sub_dir / "metadata.log").write_text( + "[pbench]\nkey = value\n", encoding="utf-8" + ) for i in range(1, 4): (sub_dir / "subdir1" / f"subdir1{i}").mkdir(parents=True, exist_ok=True) - (sub_dir / "subdir1" / "f11.txt").touch() + (sub_dir / "subdir1" / "f11.txt").write_text( + "textual\nlines\n", encoding="utf-8" + ) (sub_dir / "subdir1" / "subdir14" / "subdir141").mkdir( parents=True, exist_ok=True ) @@ -524,16 +526,6 @@ def test_unpack_tar_subprocess_exception( tar = Path("/mock/A.tar.xz") cache = Path("/mock/.cache") - locks: list[tuple[str, str]] = [] - - @contextmanager - def open(s, m, buffering=-1): - yield None - - def locker(fd, mode): - nonlocal locks - locks.append((fd, mode)) - @staticmethod def mock_run(command, _dir_path, exception, dir_p): verb = "tar" @@ -541,11 +533,10 @@ def mock_run(command, _dir_path, exception, dir_p): raise exception(dir_p, subprocess.TimeoutExpired(verb, 43)) with monkeypatch.context() as m: + m.setattr(Path, "exists", lambda self: True) m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) - m.setattr(Path, "open", open) m.setattr(Path, "touch", lambda path, exist_ok=False: None) m.setattr(Tarball, "subprocess_run", mock_run) - m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) tb = Tarball( @@ -566,16 +557,6 @@ def test_unpack_find_subprocess_exception( cache = Path("/mock/.cache") rmtree_called = True - locks: list[tuple[str, str]] = [] - - @contextmanager - def open(s, m, buffering=-1): - yield None - - def locker(fd, mode): - nonlocal locks - locks.append((fd, mode)) - def mock_rmtree(path: Path, ignore_errors=False): nonlocal rmtree_called rmtree_called = True @@ -592,10 +573,9 @@ def mock_run(command, _dir_path, exception, dir_p): assert command.startswith("tar") with monkeypatch.context() as m: + m.setattr(Path, "exists", lambda self: True) m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) - m.setattr(Path, "open", open) m.setattr(Path, "touch", lambda path, exist_ok=False: None) - m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) m.setattr(Tarball, "subprocess_run", mock_run) m.setattr(shutil, "rmtree", mock_rmtree) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) @@ -618,16 +598,6 @@ def test_unpack_success(self, make_logger, db_session, monkeypatch): cache = Path("/mock/.cache") call = list() - locks: list[tuple[str, str]] = [] - - @contextmanager - def open(s, m, buffering=-1): - yield None - - def locker(fd, mode): - nonlocal locks - locks.append((fd, mode)) - def mock_run(args, **_kwargs): call.append(args[0]) @@ -646,7 +616,6 @@ def mock_resolve(_path, _strict=False): m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) m.setattr(Path, "open", open) m.setattr(Path, "touch", lambda path, exist_ok=False: None) - m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) m.setattr("pbench.server.cache_manager.subprocess.run", mock_run) m.setattr(Path, "resolve", mock_resolve) m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) @@ -750,7 +719,7 @@ def test_cache_map_bad_dir_path( "f1.json", None, None, - 0, + 17, CacheType.FILE, ), ( @@ -768,7 +737,7 @@ def test_cache_map_bad_dir_path( "f11.txt", None, None, - 0, + 14, CacheType.FILE, ), ( @@ -777,7 +746,7 @@ def test_cache_map_bad_dir_path( "f11.txt", None, None, - 0, + 14, CacheType.FILE, ), ( @@ -786,7 +755,7 @@ def test_cache_map_bad_dir_path( "f11.txt", None, None, - 0, + 14, CacheType.FILE, ), ( @@ -945,7 +914,7 @@ def test_cache_map_traverse_cmap( "name": "f11.txt", "resolve_path": None, "resolve_type": None, - "size": 0, + "size": 14, "type": CacheType.FILE, }, ), @@ -1013,9 +982,9 @@ def test_cache_map_get_info_cmap( @pytest.mark.parametrize( "file_path,file_pattern,exp_stream", [ - ("", "dir_name.tar.xz", io.BytesIO(b"tarball_as_a_byte_stream")), - (None, "dir_name.tar.xz", io.BytesIO(b"tarball_as_a_byte_stream")), - ("f1.json", "f1.json", io.BytesIO(b"file_as_a_byte_stream")), + ("", "dir_name.tar.xz", b"tarball_as_a_byte_stream"), + (None, "dir_name.tar.xz", b"tarball_as_a_byte_stream"), + ("f1.json", "f1.json", b"{'json': 'value'}"), ("subdir1/f12_sym", None, CacheExtractBadPath(Path("a"), "b")), ], ) @@ -1023,36 +992,22 @@ def test_get_inventory( self, make_logger, monkeypatch, tmp_path, file_path, file_pattern, exp_stream ): """Test to extract file contents/stream from a file""" - archive = tmp_path / "mock/archive" - tar = archive / "ABC/dir_name.tar.xz" - cache = tmp_path / "mock/.cache" - - locks: list[tuple[str, str]] = [] - - def locker(fd, mode): - nonlocal locks - locks.append((fd, mode)) + archive = tmp_path / "mock" / "archive" / "ABC" + archive.mkdir(parents=True, exist_ok=True) + tar = archive / "dir_name.tar.xz" + tar.write_bytes(b"tarball_as_a_byte_stream") + cache = tmp_path / "mock" / ".cache" with monkeypatch.context() as m: m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) - real_open = Path.open - m.setattr( - Path, - "open", - lambda s, m="rb", buffering=-1: exp_stream - if file_pattern and file_pattern in str(s) - else real_open(s, m, buffering), - ) - m.setattr("pbench.server.cache_manager.fcntl.lockf", locker) tb = Tarball( tar, "ABC", Controller(archive, cache, make_logger) ) - tb.unpacked = cache / "ABC/dir_name" - tar_dir = TestCacheManager.MockController.generate_test_result_tree( + tb.unpacked = cache / "ABC" / "dir_name" + TestCacheManager.MockController.generate_test_result_tree( cache / "ABC", "dir_name" ) - tb.cache_map(tar_dir) try: file_info = tb.get_inventory(file_path) except Exception as e: @@ -1061,17 +1016,8 @@ def locker(fd, mode): assert not isinstance(exp_stream, Exception) assert file_info["type"] is CacheType.FILE stream: Inventory = file_info["stream"] - assert stream.stream == exp_stream + assert stream.stream.read() == exp_stream stream.close() - if not file_path: - assert len(locks) == 0 - else: - assert list(i[1] for i in locks) == [ - fcntl.LOCK_EX, - fcntl.LOCK_UN, - fcntl.LOCK_SH, - fcntl.LOCK_UN, - ] def test_cm_inventory(self, monkeypatch, server_config, make_logger): """Verify the happy path of the high level get_inventory""" @@ -1562,7 +1508,7 @@ def test_find( assert exc.value.tarball == "foobar" # Unpack the dataset, creating INCOMING and RESULTS links - cm.unpack(md5) + tarball.cache_create() assert tarball.cache == controller.cache / md5 assert tarball.unpacked == controller.cache / md5 / tarball.name assert tarball.last_ref.exists() @@ -1646,14 +1592,16 @@ def test_lifecycle( assert list(cm.tarballs) == [dataset_name] assert list(cm.datasets) == [md5] + dataset = cm.find_dataset(md5) + # Now "unpack" the tarball and check that the incoming directory and # results link are set up. - cm.unpack(md5) - assert cache == cm[md5].cache + dataset.cache_create() + assert cache == dataset.cache assert cache.is_dir() assert unpack.is_dir() - assert cm.datasets[md5].unpacked == unpack + assert dataset.unpacked == unpack # Re-discover, with all the files in place, and compare newcm = CacheManager(server_config, make_logger) @@ -1682,7 +1630,7 @@ def test_lifecycle( assert tarball.unpacked == other.unpacked # Remove the unpacked tarball, and confirm the directory is removed - cm.cache_reclaim(md5) + dataset.cache_delete() assert not unpack.exists() # Now that we have all that setup, delete the dataset diff --git a/lib/pbench/test/unit/server/test_indexing_tarballs.py b/lib/pbench/test/unit/server/test_indexing_tarballs.py index 5ee642591e..9a6f14552f 100644 --- a/lib/pbench/test/unit/server/test_indexing_tarballs.py +++ b/lib/pbench/test/unit/server/test_indexing_tarballs.py @@ -265,6 +265,53 @@ def error(self, dataset: Dataset, message: str): __class__.errors[dataset.name] = message +class FakeLockRef: + def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): + """Initialize a lock reference + + The lock file is opened in "w+" mode, which is "write update": unlike + "r+", this creates the file if it doesn't already exist, but still + allows lock conversion between LOCK_EX and LOCK_SH. + + Args: + lock: the path of a lock file + exclusive: lock for exclusive access + wait: [default] wait for lock + """ + self.locked = False + self.exclusive = exclusive + self.unlock = True + + def acquire(self) -> "FakeLockRef": + self.locked = True + return self + + def release(self): + """Release the lock and close the lock file""" + self.locked = False + self.exclusive = False + + def upgrade(self): + if not self.exclusive: + self.exclusive = True + + def downgrade(self): + if self.exclusive: + self.exclusive = False + + def keep(self) -> "FakeLockRef": + """Tell the context manager not to unlock on exit""" + self.unlock = False + return self + + def __enter__(self) -> "FakeLockRef": + return self.acquire() + + def __exit__(self, *exc): + if self.unlock: + self.release() + + class FakeController: def __init__(self, path: Path, cache: Path, logger: Logger): self.name = path.name @@ -279,9 +326,13 @@ def __init__(self, path: Path, resource_id: str, controller: FakeController): self.tarball_path = path self.controller = controller self.cache = controller.cache / "ABC" + self.lock = self.cache / "lock" + self.last_ref = self.cache / "last_ref" self.unpacked = self.cache / self.name self.isolator = controller.path / resource_id - self.uncache = lambda: None + + def cache_create(self): + pass class FakeCacheManager: @@ -292,7 +343,7 @@ def __init__(self, config: PbenchServerConfig, logger: Logger): self.logger = logger self.datasets = {} - def unpack(self, resource_id: str): + def find_dataset(self, resource_id: str): controller = FakeController(Path("/archive/ABC"), Path("/.cache"), self.logger) return FakeTarball( Path(f"/archive/ABC/{resource_id}/{self.lookup[resource_id]}.tar.xz"), @@ -341,6 +392,7 @@ def mocks(monkeypatch, make_logger): m.setattr("pbench.server.indexing_tarballs.Metadata", FakeMetadata) m.setattr("pbench.server.indexing_tarballs.IndexMap", FakeIndexMap) m.setattr("pbench.server.indexing_tarballs.CacheManager", FakeCacheManager) + m.setattr("pbench.server.indexing_tarballs.LockRef", FakeLockRef) m.setattr("pbench.server.indexing_tarballs.Audit", FakeAudit) yield m FakeAudit.reset() From d82c48250ba213e85189b4f7e8987e4eb3d00073 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Wed, 13 Sep 2023 16:18:07 -0400 Subject: [PATCH 06/14] Add some test coverage --- .../test/unit/server/test_cache_manager.py | 191 +++++++++++++++++- 1 file changed, 183 insertions(+), 8 deletions(-) diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index 8216da917a..a2cdbd596c 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -1,4 +1,5 @@ import errno +import fcntl import hashlib import io from logging import Logger @@ -21,6 +22,7 @@ Controller, DuplicateTarball, Inventory, + LockRef, MetadataError, Tarball, TarballModeChangeError, @@ -980,23 +982,39 @@ def test_cache_map_get_info_cmap( assert file_info == expected_msg @pytest.mark.parametrize( - "file_path,file_pattern,exp_stream", + "file_path,is_unpacked,exp_stream", [ - ("", "dir_name.tar.xz", b"tarball_as_a_byte_stream"), - (None, "dir_name.tar.xz", b"tarball_as_a_byte_stream"), - ("f1.json", "f1.json", b"{'json': 'value'}"), - ("subdir1/f12_sym", None, CacheExtractBadPath(Path("a"), "b")), + ("", True, b"tarball_as_a_byte_stream"), + (None, True, b"tarball_as_a_byte_stream"), + ("f1.json", True, b"{'json': 'value'}"), + ("subdir1/f12_sym", False, CacheExtractBadPath(Path("a"), "b")), ], ) def test_get_inventory( - self, make_logger, monkeypatch, tmp_path, file_path, file_pattern, exp_stream + self, make_logger, monkeypatch, tmp_path, file_path, is_unpacked, exp_stream ): - """Test to extract file contents/stream from a file""" + """Test to extract file contents/stream from a file + + NOTE: we check explicitly whether Tarball.stream() chooses to unpack + the tarball, based on "is_unpacked", to be sure both cases are covered; + but be aware that accessing the tarball itself (the "" and None + file_path cases) don't ever unpack and should always have is_unpacked + set to True. (That is, we don't expect cache_create to be called in + those cases.) + """ archive = tmp_path / "mock" / "archive" / "ABC" archive.mkdir(parents=True, exist_ok=True) tar = archive / "dir_name.tar.xz" tar.write_bytes(b"tarball_as_a_byte_stream") cache = tmp_path / "mock" / ".cache" + unpacked = cache / "ABC" / "dir_name" + + create_called = False + + def fake_create(self): + nonlocal create_called + create_called = True + self.unpacked = unpacked with monkeypatch.context() as m: m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) @@ -1004,7 +1022,9 @@ def test_get_inventory( tb = Tarball( tar, "ABC", Controller(archive, cache, make_logger) ) - tb.unpacked = cache / "ABC" / "dir_name" + m.setattr(Tarball, "cache_create", fake_create) + if is_unpacked: + tb.unpacked = unpacked TestCacheManager.MockController.generate_test_result_tree( cache / "ABC", "dir_name" ) @@ -1018,6 +1038,7 @@ def test_get_inventory( stream: Inventory = file_info["stream"] assert stream.stream.read() == exp_stream stream.close() + assert create_called is not is_unpacked def test_cm_inventory(self, monkeypatch, server_config, make_logger): """Verify the happy path of the high level get_inventory""" @@ -1688,3 +1709,157 @@ def test_compatibility( assert not tar1.exists() assert not tar2.exists() assert not tar1.parent.exists() + + def test_lockref(self, monkeypatch): + """Test behavior of the LockRef class""" + + locks: list[tuple] = [] + files: list[tuple] = [] + close_fail: Optional[Exception] = None + lockf_fail: Optional[Exception] = None + + def reset(): + nonlocal close_fail, lockf_fail + close_fail = None + lockf_fail = None + locks.clear() + files.clear() + + class FakeStream: + def __init__(self, file): + self.file = file + + def close(self): + files.append(("close", self.file)) + if close_fail: + raise close_fail + + def fileno(self) -> int: + return 0 + + def __eq__(self, other) -> bool: + return self.file == other.file + + def fake_lockf(fd: FakeStream, cmd: int): + locks.append((fd.file, cmd)) + if lockf_fail: + raise lockf_fail + + def fake_open(self, flags: str) -> FakeStream: + files.append(("open", self, flags)) + return FakeStream(self) + + monkeypatch.setattr("pbench.server.cache_manager.fcntl.lockf", fake_lockf) + monkeypatch.setattr(Path, "open", fake_open) + + lockfile = Path("/lock") + + # Simple shared lock + lock = LockRef(lockfile) + assert lock.lock == FakeStream(lockfile) + assert not lock.locked + assert not lock.exclusive + assert lock.unlock + assert lock.wait + assert len(locks) == 0 + assert files == [("open", lockfile, "w+")] + lock.acquire() + assert lock.locked + assert locks == [(lockfile, fcntl.LOCK_SH)] + lock.release() + assert not lock.locked + assert files == [("open", lockfile, "w+"), ("close", lockfile)] + assert locks == [(lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_UN)] + + # No-wait + reset() + lock = LockRef(lockfile, wait=False) + assert files == [("open", lockfile, "w+")] + assert not lock.wait + lock.acquire() + assert locks == [(lockfile, fcntl.LOCK_SH | fcntl.LOCK_NB)] + lock.release() + assert locks == [ + (lockfile, fcntl.LOCK_SH | fcntl.LOCK_NB), + (lockfile, fcntl.LOCK_UN), + ] + assert files == [("open", lockfile, "w+"), ("close", lockfile)] + + # Exclusive + reset() + lock = LockRef(lockfile, exclusive=True) + assert files == [("open", lockfile, "w+")] + assert lock.exclusive + lock.acquire() + assert locks == [(lockfile, fcntl.LOCK_EX)] + lock.release() + assert locks == [(lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_UN)] + assert files == [("open", lockfile, "w+"), ("close", lockfile)] + + # Context Manager + reset() + with LockRef(lockfile) as lock: + assert lock.locked + assert files == [("open", lockfile, "w+")] + assert locks == [(lockfile, fcntl.LOCK_SH)] + lock.upgrade() + assert locks == [(lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_EX)] + # upgrade is idempotent + lock.upgrade() + assert locks == [(lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_EX)] + lock.downgrade() + assert locks == [ + (lockfile, fcntl.LOCK_SH), + (lockfile, fcntl.LOCK_EX), + (lockfile, fcntl.LOCK_SH), + ] + # downgrade is idempotent + lock.downgrade() + assert locks == [ + (lockfile, fcntl.LOCK_SH), + (lockfile, fcntl.LOCK_EX), + (lockfile, fcntl.LOCK_SH), + ] + assert locks == [ + (lockfile, fcntl.LOCK_SH), + (lockfile, fcntl.LOCK_EX), + (lockfile, fcntl.LOCK_SH), + (lockfile, fcntl.LOCK_UN), + ] + assert files == [("open", lockfile, "w+"), ("close", lockfile)] + + # Context Manager with 'keep' option\ + reset() + with LockRef(lockfile) as lock: + assert lock.keep() is lock + assert not lock.unlock + assert locks == [(lockfile, fcntl.LOCK_SH)] + assert files == [("open", lockfile, "w+")] + + # Check release exception handling on close + reset() + lock = LockRef(lockfile).acquire() + assert locks == [(lockfile, fcntl.LOCK_SH)] + assert files == [("open", lockfile, "w+")] + + message = "Nobody inspects the Spanish Aquisition" + close_fail = Exception(message) + with pytest.raises(Exception, match=message): + lock.release() + assert lock.lock is None + assert files == [("open", lockfile, "w+"), ("close", lockfile)] + assert locks == [(lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_UN)] + + # Check release exception handling on unlock + reset() + lock = LockRef(lockfile).acquire() + assert locks == [(lockfile, fcntl.LOCK_SH)] + assert files == [("open", lockfile, "w+")] + + message = "Nobody inspects the Spanish Aquisition" + lockf_fail = Exception(message) + with pytest.raises(Exception, match=message): + lock.release() + assert lock.lock is None + assert files == [("open", lockfile, "w+")] + assert locks == [(lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_UN)] From ff7b1343887a2e51318ef39df658bff66652ffd9 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Wed, 27 Sep 2023 16:30:37 -0400 Subject: [PATCH 07/14] Separate basic LockRef from context manager --- lib/pbench/cli/server/tree_manage.py | 4 +- lib/pbench/server/cache_manager.py | 87 ++++++--- lib/pbench/server/indexing_tarballs.py | 2 +- .../test/unit/server/test_cache_manager.py | 172 +++++++++++++----- .../unit/server/test_indexing_tarballs.py | 19 +- 5 files changed, 191 insertions(+), 93 deletions(-) diff --git a/lib/pbench/cli/server/tree_manage.py b/lib/pbench/cli/server/tree_manage.py index eef74f66ee..e363b8dbde 100644 --- a/lib/pbench/cli/server/tree_manage.py +++ b/lib/pbench/cli/server/tree_manage.py @@ -9,7 +9,7 @@ from pbench.cli.server.options import common_options from pbench.common.logger import get_pbench_logger from pbench.server import BadConfig, OperationCode -from pbench.server.cache_manager import CacheManager, LockRef +from pbench.server.cache_manager import CacheManager, LockManager from pbench.server.database.models.audit import Audit, AuditStatus, AuditType # Length of time in hours to retain unreferenced cached results data. @@ -48,7 +48,7 @@ def reclaim_cache(tree: CacheManager, logger: Logger, lifetime: float = CACHE_LI window, ) try: - with LockRef(tarball.lock, exclusive=True, wait=False): + with LockManager(tarball.lock, exclusive=True, wait=False): try: audit = Audit.create( name="reclaim", diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index dc2cfdc2ef..34d387dc13 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -69,7 +69,7 @@ class MetadataError(CacheManagerError): def __init__(self, tarball: Path, error: Exception): super().__init__( - f"A problem occurred processing metadata.log from {tarball!s}: {str(error)!r}" + f"A problem occurred processing metadata.log from {tarball}: {str(error)!r}" ) self.tarball = tarball self.error = str(error) @@ -184,7 +184,7 @@ def make_cache_object(dir_path: Path, path: Path) -> CacheObject: class LockRef: """Keep track of a cache lock passed off to a caller""" - def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): + def __init__(self, lock: Path): """Initialize a lock reference The lock file is opened in "w+" mode, which is "write update": unlike @@ -193,20 +193,25 @@ def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): Args: lock: the path of a lock file - exclusive: lock for exclusive access - wait: wait for lock """ self.lock = lock.open("w+") self.locked = False - self.exclusive = exclusive - self.unlock = True - self.wait = wait + self.exclusive = False + + def acquire(self, exclusive: bool = False, wait: bool = True) -> "LockRef": + """Acquire the lock + + Args: + exclusive: lock for exclusive access + wait: wait for lock - def acquire(self) -> "LockRef": - """Acquire the lock""" + Returns: + The lockref, so this can be chained with the constructor + """ - cmd = fcntl.LOCK_EX if self.exclusive else fcntl.LOCK_SH - if not self.wait: + self.exclusive = exclusive + cmd = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH + if not wait: cmd |= fcntl.LOCK_NB fcntl.lockf(self.lock, cmd) self.locked = True @@ -215,20 +220,17 @@ def acquire(self) -> "LockRef": def release(self): """Release the lock and close the lock file""" - exception = None + if not self.locked: + return try: fcntl.lockf(self.lock, fcntl.LOCK_UN) self.lock.close() - self.locked = False - self.exclusive = False - except Exception as e: - exception = e finally: # Release our reference to the lock file so that the # object can be reclaimed. + self.locked = False + self.exclusive = False self.lock = None - if exception: - raise exception def upgrade(self): """Upgrade a shared lock to exclusive""" @@ -242,19 +244,44 @@ def downgrade(self): fcntl.lockf(self.lock, fcntl.LOCK_SH) self.exclusive = False - def keep(self) -> "LockRef": + +class LockManager: + def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): + self.lock = LockRef(lock) + self.exclusive = exclusive + self.wait = wait + self.unlock = True + + def keep(self) -> "LockManager": """Tell the context manager not to unlock on exit""" self.unlock = False return self - def __enter__(self) -> "LockRef": + def release(self): + """Release manually if necessary""" + self.lock.release() + + def upgrade(self): + """Upgrade a shared lock to exclusive""" + self.lock.upgrade() + + def downgrade(self): + """Downgrade an exclusive lock to shared""" + self.lock.downgrade() + + def __enter__(self) -> "LockManager": """Enter a lock context manager by acquiring the lock""" - return self.acquire() + self.lock.acquire(exclusive=self.exclusive, wait=self.wait) + return self def __exit__(self, *exc): - """Exit a lock context manager by releasing the lock""" + """Exit a lock context manager by releasing the lock + + This does nothing if "unlock" has been cleared, meaning we want the + lock to persist beyond the context manager scope. + """ if self.unlock: - self.release() + self.lock.release() class Inventory: @@ -771,12 +798,22 @@ def stream(self, path: str) -> Inventory: Args: path: Relative path of a regular file within a tarball. + On failure, an exception is raised and the cache lock is released; on + success, returns an Inventory object which implicitly transfers + ownership and management of the cache lock to the caller. When done + with the inventory's file stream, the caller must close the Inventory + object to release the file stream and the cache lock. + + Raises: + CacheExtractBadPath: the path does not match a regular file within + the tarball. + Returns: An Inventory object encapsulating the file stream and the cache - lock. The caller is reponsible for closing the Inventory! + lock. """ - with LockRef(self.lock) as lock: + with LockManager(self.lock) as lock: if not self.unpacked: lock.upgrade() self.cache_create() diff --git a/lib/pbench/server/indexing_tarballs.py b/lib/pbench/server/indexing_tarballs.py index d58f72f4c0..436c0dee00 100644 --- a/lib/pbench/server/indexing_tarballs.py +++ b/lib/pbench/server/indexing_tarballs.py @@ -370,7 +370,7 @@ def sighup_handler(*args): tarobj = self.cache_manager.find_dataset( dataset.resource_id ) - lock = LockRef(tarobj.lock, exclusive=True).acquire() + lock = LockRef(tarobj.lock).acquire(exclusive=True) tarobj.cache_create() lock.downgrade() except Exception as e: diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index a2cdbd596c..055e3b4602 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -22,6 +22,7 @@ Controller, DuplicateTarball, Inventory, + LockManager, LockRef, MetadataError, Tarball, @@ -547,8 +548,8 @@ def mock_run(command, _dir_path, exception, dir_p): with pytest.raises(TarballUnpackError) as exc: tb.cache_create() msg = f"An error occurred while unpacking {tar}: Command 'tar' timed out after 43 seconds" - assert str(exc.value) == msg - assert exc.type == TarballUnpackError + with pytest.raises(TarballUnpackError, match=msg): + tb.cache_create() def test_unpack_find_subprocess_exception( self, make_logger, db_session, monkeypatch @@ -586,12 +587,11 @@ def mock_run(command, _dir_path, exception, dir_p): tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) ) - with pytest.raises(TarballModeChangeError) as exc: - tb.cache_create() msg = "An error occurred while changing file permissions of " msg += f"{cache / 'ABC'}: Command 'find' timed out after 43 seconds" + with pytest.raises(TarballModeChangeError, match=msg) as exc: + tb.cache_create() assert str(exc.value) == msg - assert exc.type == TarballModeChangeError assert rmtree_called def test_unpack_success(self, make_logger, db_session, monkeypatch): @@ -616,7 +616,6 @@ def mock_resolve(_path, _strict=False): with monkeypatch.context() as m: m.setattr(Path, "mkdir", lambda path, parents=False, exist_ok=False: None) - m.setattr(Path, "open", open) m.setattr(Path, "touch", lambda path, exist_ok=False: None) m.setattr("pbench.server.cache_manager.subprocess.run", mock_run) m.setattr(Path, "resolve", mock_resolve) @@ -1264,6 +1263,7 @@ def seek(self, offset: int, _whence: int = io.SEEK_SET) -> int: stream = Inventory(MockBufferedReader()) assert stream.lock is None + assert stream.subproc is None # Test Inventory.getbuffer() calls.clear() @@ -1579,7 +1579,7 @@ def test_lifecycle( cm = CacheManager(server_config, make_logger) archive = cm.archive_root / "ABC" cache = cm.cache_root / md5 - dataset_name = source_tarball.name[:-7] + dataset_name = Dataset.stem(source_tarball) unpack = cache / dataset_name monkeypatch.setattr(Tarball, "_get_metadata", fake_get_metadata) @@ -1740,6 +1740,9 @@ def fileno(self) -> int: def __eq__(self, other) -> bool: return self.file == other.file + def __str__(self) -> str: + return f"Fake Stream with file {self.file!r}" + def fake_lockf(fd: FakeStream, cmd: int): locks.append((fd.file, cmd)) if lockf_fail: @@ -1759,8 +1762,6 @@ def fake_open(self, flags: str) -> FakeStream: assert lock.lock == FakeStream(lockfile) assert not lock.locked assert not lock.exclusive - assert lock.unlock - assert lock.wait assert len(locks) == 0 assert files == [("open", lockfile, "w+")] lock.acquire() @@ -1773,10 +1774,10 @@ def fake_open(self, flags: str) -> FakeStream: # No-wait reset() - lock = LockRef(lockfile, wait=False) + lock = LockRef(lockfile) assert files == [("open", lockfile, "w+")] - assert not lock.wait - lock.acquire() + lock.acquire(wait=False) + assert not lock.exclusive assert locks == [(lockfile, fcntl.LOCK_SH | fcntl.LOCK_NB)] lock.release() assert locks == [ @@ -1787,54 +1788,47 @@ def fake_open(self, flags: str) -> FakeStream: # Exclusive reset() - lock = LockRef(lockfile, exclusive=True) + lock = LockRef(lockfile) assert files == [("open", lockfile, "w+")] + lock.acquire(exclusive=True) assert lock.exclusive - lock.acquire() assert locks == [(lockfile, fcntl.LOCK_EX)] - lock.release() - assert locks == [(lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_UN)] - assert files == [("open", lockfile, "w+"), ("close", lockfile)] - - # Context Manager - reset() - with LockRef(lockfile) as lock: - assert lock.locked - assert files == [("open", lockfile, "w+")] - assert locks == [(lockfile, fcntl.LOCK_SH)] - lock.upgrade() - assert locks == [(lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_EX)] - # upgrade is idempotent - lock.upgrade() - assert locks == [(lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_EX)] - lock.downgrade() - assert locks == [ - (lockfile, fcntl.LOCK_SH), - (lockfile, fcntl.LOCK_EX), - (lockfile, fcntl.LOCK_SH), - ] - # downgrade is idempotent - lock.downgrade() - assert locks == [ - (lockfile, fcntl.LOCK_SH), - (lockfile, fcntl.LOCK_EX), - (lockfile, fcntl.LOCK_SH), - ] + assert lock.locked + assert lock.exclusive + # upgrade is idempotent + lock.upgrade() + assert locks == [(lockfile, fcntl.LOCK_EX)] + assert lock.exclusive + assert lock.locked + lock.downgrade() + assert locks == [(lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_SH)] + assert not lock.exclusive + assert lock.locked + # downgrade is idempotent + lock.downgrade() + assert locks == [(lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_SH)] + assert not lock.exclusive + assert lock.locked + # upgrade back to exclusive + lock.upgrade() assert locks == [ + (lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_EX), + ] + assert lock.exclusive + assert lock.locked + # release + lock.release() + assert locks == [ + (lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_SH), + (lockfile, fcntl.LOCK_EX), (lockfile, fcntl.LOCK_UN), ] assert files == [("open", lockfile, "w+"), ("close", lockfile)] - - # Context Manager with 'keep' option\ - reset() - with LockRef(lockfile) as lock: - assert lock.keep() is lock - assert not lock.unlock - assert locks == [(lockfile, fcntl.LOCK_SH)] - assert files == [("open", lockfile, "w+")] + assert not lock.locked + assert not lock.exclusive # Check release exception handling on close reset() @@ -1863,3 +1857,81 @@ def fake_open(self, flags: str) -> FakeStream: assert lock.lock is None assert files == [("open", lockfile, "w+")] assert locks == [(lockfile, fcntl.LOCK_SH), (lockfile, fcntl.LOCK_UN)] + + def test_lockmanager(self, monkeypatch): + """Test LockManager""" + + operations = [] + + class FakeLockRef: + def __init__(self, lockpath: Path): + self.lock = lockpath + + def acquire(self, **kwargs): + operations.append( + ( + "acquire", + kwargs.get("exclusive", False), + kwargs.get("wait", True), + ) + ) + + def release(self): + operations.append(("release")) + + def upgrade(self): + operations.append(("upgrade")) + + def downgrade(self): + operations.append(("downgrade")) + + def reset(): + operations.clear() + + lockfile = Path("/lock") + monkeypatch.setattr("pbench.server.cache_manager.LockRef", FakeLockRef) + # default parameters + with LockManager(lockfile) as lock: + assert lock.unlock + assert not lock.exclusive + assert lock.wait + assert operations == [("acquire", False, True)] + lock.upgrade() + assert operations == [("acquire", False, True), ("upgrade")] + lock.downgrade() + assert operations == [("acquire", False, True), ("upgrade"), ("downgrade")] + assert operations == [ + ("acquire", False, True), + ("upgrade"), + ("downgrade"), + ("release"), + ] + + # exclusive + reset() + with LockManager(lockfile, exclusive=True) as lock: + assert lock.unlock + assert lock.exclusive + assert lock.wait + assert operations == [("acquire", True, True)] + assert operations == [("acquire", True, True), ("release")] + + # no-wait + reset() + with LockManager(lockfile, wait=False) as lock: + assert lock.unlock + assert not lock.exclusive + assert not lock.wait + assert operations == [("acquire", False, False)] + assert operations == [("acquire", False, False), ("release")] + + # keep' option + reset() + with LockManager(lockfile) as lock: + assert lock.keep() is lock + still_locked = lock + assert not lock.unlock + assert operations == [("acquire", False, True)] + assert operations == [("acquire", False, True)] + still_locked.release() + assert operations == [("acquire", False, True), ("release")] diff --git a/lib/pbench/test/unit/server/test_indexing_tarballs.py b/lib/pbench/test/unit/server/test_indexing_tarballs.py index 9a6f14552f..789987d47e 100644 --- a/lib/pbench/test/unit/server/test_indexing_tarballs.py +++ b/lib/pbench/test/unit/server/test_indexing_tarballs.py @@ -266,7 +266,7 @@ def error(self, dataset: Dataset, message: str): class FakeLockRef: - def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): + def __init__(self, lock: Path): """Initialize a lock reference The lock file is opened in "w+" mode, which is "write update": unlike @@ -279,11 +279,12 @@ def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): wait: [default] wait for lock """ self.locked = False - self.exclusive = exclusive + self.exclusive = False self.unlock = True - def acquire(self) -> "FakeLockRef": + def acquire(self, exclusive: bool = False, wait: bool = True) -> "FakeLockRef": self.locked = True + self.exclusive = exclusive return self def release(self): @@ -299,18 +300,6 @@ def downgrade(self): if self.exclusive: self.exclusive = False - def keep(self) -> "FakeLockRef": - """Tell the context manager not to unlock on exit""" - self.unlock = False - return self - - def __enter__(self) -> "FakeLockRef": - return self.acquire() - - def __exit__(self, *exc): - if self.unlock: - self.release() - class FakeController: def __init__(self, path: Path, cache: Path, logger: Logger): From fb605727931533b248c4fe66a9b89786201933b1 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Wed, 27 Sep 2023 17:40:17 -0400 Subject: [PATCH 08/14] Clean up merge --- lib/pbench/test/unit/server/test_cache_manager.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index 055e3b4602..6c04431011 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -545,8 +545,6 @@ def mock_run(command, _dir_path, exception, dir_p): tb = Tarball( tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) ) - with pytest.raises(TarballUnpackError) as exc: - tb.cache_create() msg = f"An error occurred while unpacking {tar}: Command 'tar' timed out after 43 seconds" with pytest.raises(TarballUnpackError, match=msg): tb.cache_create() @@ -1018,9 +1016,7 @@ def fake_create(self): with monkeypatch.context() as m: m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) - tb = Tarball( - tar, "ABC", Controller(archive, cache, make_logger) - ) + tb = Tarball(tar, "ABC", Controller(archive, cache, make_logger)) m.setattr(Tarball, "cache_create", fake_create) if is_unpacked: tb.unpacked = unpacked @@ -1689,8 +1685,8 @@ def test_compatibility( t2 = cm1[id] assert t1.name == t2.name == Dataset.stem(source_tarball) - t1.unpack() - t2.unpack() + t1.cache_create() + t2.cache_create() assert t1.unpacked != t2.unpacked assert (t1.unpacked / "metadata.log").is_file() From 78ca1b21786ef242af1457f9280ba993a26161b5 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Thu, 28 Sep 2023 11:50:03 -0400 Subject: [PATCH 09/14] Clean up some docstrings --- .../unit/server/test_indexing_tarballs.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/lib/pbench/test/unit/server/test_indexing_tarballs.py b/lib/pbench/test/unit/server/test_indexing_tarballs.py index 789987d47e..23e91dab6c 100644 --- a/lib/pbench/test/unit/server/test_indexing_tarballs.py +++ b/lib/pbench/test/unit/server/test_indexing_tarballs.py @@ -267,22 +267,25 @@ def error(self, dataset: Dataset, message: str): class FakeLockRef: def __init__(self, lock: Path): - """Initialize a lock reference - - The lock file is opened in "w+" mode, which is "write update": unlike - "r+", this creates the file if it doesn't already exist, but still - allows lock conversion between LOCK_EX and LOCK_SH. + """Initialize a mocked lock reference Args: lock: the path of a lock file - exclusive: lock for exclusive access - wait: [default] wait for lock """ self.locked = False self.exclusive = False self.unlock = True def acquire(self, exclusive: bool = False, wait: bool = True) -> "FakeLockRef": + """Acquire the lock + + Args: + exclusive: lock for exclusive access + wait: [default] wait for lock + + Returns: + self reference so acquire can be chained with constructor + """ self.locked = True self.exclusive = exclusive return self @@ -293,10 +296,12 @@ def release(self): self.exclusive = False def upgrade(self): + """Upgrade a shared lock to exclusive""" if not self.exclusive: self.exclusive = True def downgrade(self): + """Downgrade an exclusive lock to shared""" if self.exclusive: self.exclusive = False From e11ff4719d21b74f199517ee7320e8d7c09d6498 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Fri, 29 Sep 2023 09:41:39 -0400 Subject: [PATCH 10/14] Hack to fix functional tests I can't figure out why the default `ubi9` container configuration + EPEL is no longer finding `rsyslog-mmjsonparse`. I've found no relevant hits on searches nor any obvious workaround. For now, try changing `Pipeline.gy` to override the default `BASE_IMAGE` and use `centos:stream9` instead. --- jenkins/Pipeline.gy | 3 +++ 1 file changed, 3 insertions(+) diff --git a/jenkins/Pipeline.gy b/jenkins/Pipeline.gy index 43448e1fa8..7eff6fe7fa 100644 --- a/jenkins/Pipeline.gy +++ b/jenkins/Pipeline.gy @@ -19,6 +19,9 @@ pipeline { // otherwise we set it to the branch name (e.g., `main`). PB_IMAGE_TAG="${env.CHANGE_ID ?: env.BRANCH_NAME}" PB_SERVER_IMAGE_NAME="pbench-server" + // FIXME: ubi9 doesn't seem to have rsyslog-mmjsonparse available, so + // temporarily switch to centos:stream9 + BASE_IMAGE="quay.io/centos/centos:stream9" } stages { stage('Agent Python3.6 Check') { From f8f3bc57ec88ebd39be79e598a2eb9bec34d4f5d Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Fri, 29 Sep 2023 16:43:18 -0400 Subject: [PATCH 11/14] Revert BASE_IMAGE hack... --- jenkins/Pipeline.gy | 3 --- 1 file changed, 3 deletions(-) diff --git a/jenkins/Pipeline.gy b/jenkins/Pipeline.gy index 7eff6fe7fa..43448e1fa8 100644 --- a/jenkins/Pipeline.gy +++ b/jenkins/Pipeline.gy @@ -19,9 +19,6 @@ pipeline { // otherwise we set it to the branch name (e.g., `main`). PB_IMAGE_TAG="${env.CHANGE_ID ?: env.BRANCH_NAME}" PB_SERVER_IMAGE_NAME="pbench-server" - // FIXME: ubi9 doesn't seem to have rsyslog-mmjsonparse available, so - // temporarily switch to centos:stream9 - BASE_IMAGE="quay.io/centos/centos:stream9" } stages { stage('Agent Python3.6 Check') { From ae00530b6dfe8dc3302a7945c94e9f45484dde13 Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Mon, 2 Oct 2023 16:51:49 -0400 Subject: [PATCH 12/14] Some refactoring 1. Fix Inventory.close() to always close the stream. 2. Make cache load more transparent by upgrading lock if we need to unpack. --- lib/pbench/server/cache_manager.py | 61 ++++++--- lib/pbench/server/indexing_tarballs.py | 5 +- .../test/unit/server/test_cache_manager.py | 126 ++++++++++-------- .../unit/server/test_indexing_tarballs.py | 5 +- 4 files changed, 114 insertions(+), 83 deletions(-) diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index 34d387dc13..be6676034b 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -334,16 +334,22 @@ def close(self): while self.subproc.stderr.read(4096): pass self.subproc.wait(60.0) - finally: + except Exception as e: # Release our reference to the subprocess.Popen object so that the # object can be reclaimed. self.subproc = None + exception = e if self.lock: try: self.lock.release() except Exception as e: exception = e self.stream.close() + + # NOTE: if both subprocess cleanup and unlock fail with exceptions, we + # raise the latter, and the former will be ignored. In practice, that's + # not a problem as we only construct an Inventory with a subprocess + # reference for extract, which doesn't need to lock a cache directory. if exception: raise exception @@ -585,8 +591,10 @@ def create( return cls(destination, resource_id, controller) def cache_map(self, dir_path: Path): - """Builds Hierarchy structure of a Directory in a Dictionary - Format. + """Build hierarchical representation of results tree + + NOTE: this structure isn't removed when we release the cache, as the + data remains valid. Args: dir_path: root directory @@ -614,8 +622,7 @@ def cache_map(self, dir_path: Path): @staticmethod def traverse_cmap(path: Path, cachemap: CacheMap) -> CacheMapEntry: - """Sequentially traverses the cachemap to find the leaf of a - relative path reference + """Locate a path in the cache map Args: path: relative path of the subdirectory/file @@ -648,11 +655,10 @@ def traverse_cmap(path: Path, cachemap: CacheMap) -> CacheMapEntry: def get_info(self, path: Path) -> JSONOBJECT: """Returns the details of the given file/directory in dict format - NOTE: This requires a call to the cache_map method to build a map that - can be traversed. Currently, this is done only on unpack, and isn't - useful except within the `pbench-index` process. This map needs to - either be built dynamically (potentially expensive) or persisted in - SQL or perhaps Redis. + NOTE: If the cache manager doesn't already have a cache map for the + current Tarball, we'll unpack it here; however as the cache map isn't + dependent on the unpacked results tree, we immediately release the + cache lock. Args: path: path of the file/subdirectory @@ -681,6 +687,10 @@ def get_info(self, path: Path) -> JSONOBJECT: " we expect relative path to the root directory." ) + if not self.cachemap: + with LockManager(self.lock) as lock: + self.get_results(lock) + c_map = self.traverse_cmap(path, self.cachemap) children = c_map["children"] if "children" in c_map else {} fd_info = c_map["details"].__dict__.copy() @@ -704,6 +714,11 @@ def get_info(self, path: Path) -> JSONOBJECT: def extract(tarball_path: Path, path: str) -> Inventory: """Returns a file stream for a file within a tarball + NOTE: This should generally be used only within the INTAKE path before + it's practical to load the cache with a fully unpacked tree. The + extracted file is not cached and therefore each reference will repeat + the tar file extraction. + Args: tarball_path: absolute path of the tarball path: relative path within the tarball @@ -814,15 +829,9 @@ def stream(self, path: str) -> Inventory: """ with LockManager(self.lock) as lock: - if not self.unpacked: - lock.upgrade() - self.cache_create() - lock.downgrade() - - artifact: Path = self.unpacked / path + artifact: Path = self.get_results(lock) / path if not artifact.is_file(): raise CacheExtractBadPath(self.tarball_path, path) - self.last_ref.touch(exist_ok=True) return Inventory(artifact.open("rb"), lock=lock.keep()) def get_inventory(self, path: str) -> Optional[JSONOBJECT]: @@ -906,16 +915,22 @@ def subprocess_run( f"{cmd[0]} exited with status {process.returncode}: {process.stderr.strip()!r}", ) - def cache_create(self): + def get_results(self, lock: Union[LockManager, LockRef]) -> Path: """Unpack a tarball into a temporary directory tree - Unpack the tarball into a temporary cache directory named with the - tarball's resource_id (MD5). + Make sure that the dataset results are unpacked into a cache tree. The + location of the unpacked tree is in self.unpacked and is also returned + direct to the caller. + + Args: + lock: A lock reference (or context manager) in shared lock state - This method must be called while holding an exclusive cache lock. + Returns: + the root Path of the unpacked directory tree """ if not self.unpacked: + lock.upgrade() audit = None error = None try: @@ -944,7 +959,6 @@ def cache_create(self): find_command, self.cache, TarballModeChangeError, self.cache ) self.unpacked = self.cache / self.name - self.last_ref.touch(exist_ok=True) self.cache_map(self.unpacked) except Exception as e: error = str(e) @@ -957,6 +971,9 @@ def cache_create(self): status=AuditStatus.FAILURE if error else AuditStatus.SUCCESS, attributes=attributes, ) + lock.downgrade() + self.last_ref.touch(exist_ok=True) + return self.unpacked def cache_delete(self): """Remove the unpacked tarball directory and all contents. diff --git a/lib/pbench/server/indexing_tarballs.py b/lib/pbench/server/indexing_tarballs.py index 436c0dee00..a520106765 100644 --- a/lib/pbench/server/indexing_tarballs.py +++ b/lib/pbench/server/indexing_tarballs.py @@ -370,9 +370,8 @@ def sighup_handler(*args): tarobj = self.cache_manager.find_dataset( dataset.resource_id ) - lock = LockRef(tarobj.lock).acquire(exclusive=True) - tarobj.cache_create() - lock.downgrade() + lock = LockRef(tarobj.lock).acquire() + tarobj.get_results(lock) except Exception as e: self.sync.error( dataset, diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index 6c04431011..c4948aefce 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -86,6 +86,35 @@ def fake_get_metadata(_tb_path): ) +class FakeLockRef: + operations = [] + + def __init__(self, lockpath: Path): + self.lock = lockpath + + def acquire(self, **kwargs): + self.operations.append( + ( + "acquire", + kwargs.get("exclusive", False), + kwargs.get("wait", True), + ) + ) + + def release(self): + self.operations.append(("release")) + + def upgrade(self): + self.operations.append(("upgrade")) + + def downgrade(self): + self.operations.append(("downgrade")) + + @classmethod + def reset(cls): + cls.operations.clear() + + class TestCacheManager: def test_create(self, server_config, make_logger): """ @@ -547,7 +576,8 @@ def mock_run(command, _dir_path, exception, dir_p): ) msg = f"An error occurred while unpacking {tar}: Command 'tar' timed out after 43 seconds" with pytest.raises(TarballUnpackError, match=msg): - tb.cache_create() + tb.get_results(FakeLockRef(tb.lock)) + FakeLockRef.reset() def test_unpack_find_subprocess_exception( self, make_logger, db_session, monkeypatch @@ -588,9 +618,10 @@ def mock_run(command, _dir_path, exception, dir_p): msg = "An error occurred while changing file permissions of " msg += f"{cache / 'ABC'}: Command 'find' timed out after 43 seconds" with pytest.raises(TarballModeChangeError, match=msg) as exc: - tb.cache_create() + tb.get_results(FakeLockRef(tb.lock)) assert str(exc.value) == msg assert rmtree_called + FakeLockRef.reset() def test_unpack_success(self, make_logger, db_session, monkeypatch): """Test to check the unpacking functionality of the CacheManager""" @@ -622,9 +653,10 @@ def mock_resolve(_path, _strict=False): tb = Tarball( tar, "ABC", Controller(Path("/mock/archive"), cache, make_logger) ) - tb.cache_create() + tb.get_results(FakeLockRef(tb.lock)) assert call == ["tar", "find"] assert tb.unpacked == cache / "ABC" / tb.name + FakeLockRef.reset() def test_cache_map_success(self, make_logger, monkeypatch, tmp_path): """Test to build the cache map of the root directory""" @@ -996,7 +1028,7 @@ def test_get_inventory( the tarball, based on "is_unpacked", to be sure both cases are covered; but be aware that accessing the tarball itself (the "" and None file_path cases) don't ever unpack and should always have is_unpacked - set to True. (That is, we don't expect cache_create to be called in + set to True. (That is, we don't expect get_results to be called in those cases.) """ archive = tmp_path / "mock" / "archive" / "ABC" @@ -1006,18 +1038,19 @@ def test_get_inventory( cache = tmp_path / "mock" / ".cache" unpacked = cache / "ABC" / "dir_name" - create_called = False + get_results_called = False - def fake_create(self): - nonlocal create_called - create_called = True + def fake_results(self, lock: LockRef) -> Path: + nonlocal get_results_called + get_results_called = True self.unpacked = unpacked + return self.unpacked with monkeypatch.context() as m: m.setattr(Tarball, "__init__", TestCacheManager.MockTarball.__init__) m.setattr(Controller, "__init__", TestCacheManager.MockController.__init__) tb = Tarball(tar, "ABC", Controller(archive, cache, make_logger)) - m.setattr(Tarball, "cache_create", fake_create) + m.setattr(Tarball, "get_results", fake_results) if is_unpacked: tb.unpacked = unpacked TestCacheManager.MockController.generate_test_result_tree( @@ -1033,7 +1066,6 @@ def fake_create(self): stream: Inventory = file_info["stream"] assert stream.stream.read() == exp_stream stream.close() - assert create_called is not is_unpacked def test_cm_inventory(self, monkeypatch, server_config, make_logger): """Verify the happy path of the high level get_inventory""" @@ -1369,7 +1401,7 @@ def seek(self, offset: int, _whence: int = io.SEEK_SET) -> int: ), # The subprocess is still running when close() is called, stdout is # empty, there is no stderr, and the wait times out. - (None, 0, None, True, ["poll", "kill", "stdout", "wait"]), + (None, 0, None, True, ["poll", "kill", "stdout", "wait", "close"]), ), ) def test_inventory( @@ -1525,10 +1557,11 @@ def test_find( assert exc.value.tarball == "foobar" # Unpack the dataset, creating INCOMING and RESULTS links - tarball.cache_create() + tarball.get_results(FakeLockRef(tarball.lock)) assert tarball.cache == controller.cache / md5 assert tarball.unpacked == controller.cache / md5 / tarball.name assert tarball.last_ref.exists() + FakeLockRef.reset() audits = Audit.query(name="cache") assert len(audits) == 2 @@ -1613,10 +1646,11 @@ def test_lifecycle( # Now "unpack" the tarball and check that the incoming directory and # results link are set up. - dataset.cache_create() + dataset.get_results(FakeLockRef(dataset.lock)) assert cache == dataset.cache assert cache.is_dir() assert unpack.is_dir() + FakeLockRef.reset() assert dataset.unpacked == unpack @@ -1685,8 +1719,11 @@ def test_compatibility( t2 = cm1[id] assert t1.name == t2.name == Dataset.stem(source_tarball) - t1.cache_create() - t2.cache_create() + r1 = t1.get_results(FakeLockRef(t1.lock)) + assert r1 == t1.unpacked + r2 = t2.get_results(FakeLockRef(t2.lock)) + assert r2 == t2.unpacked + FakeLockRef.reset() assert t1.unpacked != t2.unpacked assert (t1.unpacked / "metadata.log").is_file() @@ -1857,33 +1894,6 @@ def fake_open(self, flags: str) -> FakeStream: def test_lockmanager(self, monkeypatch): """Test LockManager""" - operations = [] - - class FakeLockRef: - def __init__(self, lockpath: Path): - self.lock = lockpath - - def acquire(self, **kwargs): - operations.append( - ( - "acquire", - kwargs.get("exclusive", False), - kwargs.get("wait", True), - ) - ) - - def release(self): - operations.append(("release")) - - def upgrade(self): - operations.append(("upgrade")) - - def downgrade(self): - operations.append(("downgrade")) - - def reset(): - operations.clear() - lockfile = Path("/lock") monkeypatch.setattr("pbench.server.cache_manager.LockRef", FakeLockRef) # default parameters @@ -1891,12 +1901,16 @@ def reset(): assert lock.unlock assert not lock.exclusive assert lock.wait - assert operations == [("acquire", False, True)] + assert FakeLockRef.operations == [("acquire", False, True)] lock.upgrade() - assert operations == [("acquire", False, True), ("upgrade")] + assert FakeLockRef.operations == [("acquire", False, True), ("upgrade")] lock.downgrade() - assert operations == [("acquire", False, True), ("upgrade"), ("downgrade")] - assert operations == [ + assert FakeLockRef.operations == [ + ("acquire", False, True), + ("upgrade"), + ("downgrade"), + ] + assert FakeLockRef.operations == [ ("acquire", False, True), ("upgrade"), ("downgrade"), @@ -1904,30 +1918,30 @@ def reset(): ] # exclusive - reset() + FakeLockRef.reset() with LockManager(lockfile, exclusive=True) as lock: assert lock.unlock assert lock.exclusive assert lock.wait - assert operations == [("acquire", True, True)] - assert operations == [("acquire", True, True), ("release")] + assert FakeLockRef.operations == [("acquire", True, True)] + assert FakeLockRef.operations == [("acquire", True, True), ("release")] # no-wait - reset() + FakeLockRef.reset() with LockManager(lockfile, wait=False) as lock: assert lock.unlock assert not lock.exclusive assert not lock.wait - assert operations == [("acquire", False, False)] - assert operations == [("acquire", False, False), ("release")] + assert FakeLockRef.operations == [("acquire", False, False)] + assert FakeLockRef.operations == [("acquire", False, False), ("release")] # keep' option - reset() + FakeLockRef.reset() with LockManager(lockfile) as lock: assert lock.keep() is lock still_locked = lock assert not lock.unlock - assert operations == [("acquire", False, True)] - assert operations == [("acquire", False, True)] + assert FakeLockRef.operations == [("acquire", False, True)] + assert FakeLockRef.operations == [("acquire", False, True)] still_locked.release() - assert operations == [("acquire", False, True), ("release")] + assert FakeLockRef.operations == [("acquire", False, True), ("release")] diff --git a/lib/pbench/test/unit/server/test_indexing_tarballs.py b/lib/pbench/test/unit/server/test_indexing_tarballs.py index 23e91dab6c..ee96f11b86 100644 --- a/lib/pbench/test/unit/server/test_indexing_tarballs.py +++ b/lib/pbench/test/unit/server/test_indexing_tarballs.py @@ -5,7 +5,7 @@ from pathlib import Path from signal import SIGHUP import time -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Union import pytest @@ -16,6 +16,7 @@ OperationCode, PbenchServerConfig, ) +from pbench.server.cache_manager import LockManager, LockRef from pbench.server.database.models.audit import AuditStatus from pbench.server.database.models.datasets import ( Dataset, @@ -325,7 +326,7 @@ def __init__(self, path: Path, resource_id: str, controller: FakeController): self.unpacked = self.cache / self.name self.isolator = controller.path / resource_id - def cache_create(self): + def get_results(self, lock: Union[LockRef, LockManager]): pass From 82b1097ebd4a157d772bdc3af722930ae57f6d4c Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Mon, 9 Oct 2023 17:40:38 -0400 Subject: [PATCH 13/14] More cleanup --- lib/pbench/cli/server/tree_manage.py | 7 +- lib/pbench/server/cache_manager.py | 19 +++- lib/pbench/server/indexing_tarballs.py | 99 +++++++++---------- .../test/unit/server/test_cache_manager.py | 29 +++--- .../unit/server/test_indexing_tarballs.py | 32 +++--- 5 files changed, 96 insertions(+), 90 deletions(-) diff --git a/lib/pbench/cli/server/tree_manage.py b/lib/pbench/cli/server/tree_manage.py index e363b8dbde..e70fb93b97 100644 --- a/lib/pbench/cli/server/tree_manage.py +++ b/lib/pbench/cli/server/tree_manage.py @@ -65,11 +65,8 @@ def reclaim_cache(tree: CacheManager, logger: Logger, lifetime: float = CACHE_LI tarball.name, e, ) - try: - tarball.cache_delete() - reclaimed += 1 - except Exception as e: - error = e + tarball.cache_delete() + reclaimed += 1 except OSError as e: if e.errno in (errno.EAGAIN, errno.EACCES): logger.info( diff --git a/lib/pbench/server/cache_manager.py b/lib/pbench/server/cache_manager.py index be6676034b..20d4fd2d02 100644 --- a/lib/pbench/server/cache_manager.py +++ b/lib/pbench/server/cache_manager.py @@ -205,6 +205,10 @@ def acquire(self, exclusive: bool = False, wait: bool = True) -> "LockRef": exclusive: lock for exclusive access wait: wait for lock + Raises: + OSError (EAGAIN or EACCES): wait=False and the lock is already + owned + Returns: The lockref, so this can be chained with the constructor """ @@ -264,13 +268,22 @@ def release(self): def upgrade(self): """Upgrade a shared lock to exclusive""" self.lock.upgrade() + self.exclusive = True def downgrade(self): """Downgrade an exclusive lock to shared""" self.lock.downgrade() + self.exclusive = False def __enter__(self) -> "LockManager": - """Enter a lock context manager by acquiring the lock""" + """Enter a lock context manager by acquiring the lock + + Raises: + OSError: self.wait is False, and the lock is already owned. + + Returns: + the LockManager object + """ self.lock.acquire(exclusive=self.exclusive, wait=self.wait) return self @@ -915,7 +928,7 @@ def subprocess_run( f"{cmd[0]} exited with status {process.returncode}: {process.stderr.strip()!r}", ) - def get_results(self, lock: Union[LockManager, LockRef]) -> Path: + def get_results(self, lock: LockManager) -> Path: """Unpack a tarball into a temporary directory tree Make sure that the dataset results are unpacked into a cache tree. The @@ -923,7 +936,7 @@ def get_results(self, lock: Union[LockManager, LockRef]) -> Path: direct to the caller. Args: - lock: A lock reference (or context manager) in shared lock state + lock: A lock context manager in shared lock state Returns: the root Path of the unpacked directory tree diff --git a/lib/pbench/server/indexing_tarballs.py b/lib/pbench/server/indexing_tarballs.py index a520106765..9f33d1c104 100644 --- a/lib/pbench/server/indexing_tarballs.py +++ b/lib/pbench/server/indexing_tarballs.py @@ -15,7 +15,7 @@ UnsupportedTarballFormat, ) from pbench.server import OperationCode, tstos -from pbench.server.cache_manager import CacheManager, LockRef, Tarball +from pbench.server.cache_manager import CacheManager, LockManager, Tarball from pbench.server.database.models.audit import Audit, AuditStatus from pbench.server.database.models.datasets import ( Dataset, @@ -363,68 +363,65 @@ def sighup_handler(*args): ptb = None tarobj: Optional[Tarball] = None tb_res = error_code["OK"] - lock = None try: # We need the fully unpacked cache tree to index it try: tarobj = self.cache_manager.find_dataset( dataset.resource_id ) - lock = LockRef(tarobj.lock).acquire() - tarobj.get_results(lock) except Exception as e: self.sync.error( dataset, - f"Unable to unpack dataset: {e!s}", + f"Unable to find dataset: {e!s}", ) - if lock: - lock.release() continue - audit = Audit.create( - operation=OperationCode.UPDATE, - name="index", - status=AuditStatus.BEGIN, - user_name=Audit.BACKGROUND_USER, - dataset=dataset, - ) + with LockManager(tarobj.lock) as lock: + tarobj.get_results(lock) + audit = Audit.create( + operation=OperationCode.UPDATE, + name="index", + status=AuditStatus.BEGIN, + user_name=Audit.BACKGROUND_USER, + dataset=dataset, + ) - # "Open" the tar ball represented by the tar ball object - idxctx.logger.debug("open tar ball") - ptb = PbenchTarBall(idxctx, dataset, tmpdir, tarobj) - - # Construct the generator for emitting all actions. - # The `idxctx` dictionary is passed along to each - # generator so that it can add its context for - # error handling to the list. - idxctx.logger.debug("generator setup") - if self.options.index_tool_data: - actions = ptb.mk_tool_data_actions() - else: - actions = ptb.make_all_actions() - - # Create a file where the pyesbulk package will - # record all indexing errors that can't/won't be - # retried. - with ie_filepath.open(mode="w") as fp: - idxctx.logger.debug("begin indexing") - try: - signal.signal(signal.SIGINT, sigint_handler) - es_res = es_index( - idxctx.es, - actions, - fp, - idxctx.logger, - idxctx._dbg, - ) - except SigIntException: - idxctx.logger.exception( - "Indexing interrupted by SIGINT, continuing to next tarball" - ) - continue - finally: - # Turn off the SIGINT handler when not indexing. - signal.signal(signal.SIGINT, signal.SIG_IGN) + # "Open" the tar ball represented by the tar ball object + idxctx.logger.debug("open tar ball") + ptb = PbenchTarBall(idxctx, dataset, tmpdir, tarobj) + + # Construct the generator for emitting all actions. + # The `idxctx` dictionary is passed along to each + # generator so that it can add its context for + # error handling to the list. + idxctx.logger.debug("generator setup") + if self.options.index_tool_data: + actions = ptb.mk_tool_data_actions() + else: + actions = ptb.make_all_actions() + + # Create a file where the pyesbulk package will + # record all indexing errors that can't/won't be + # retried. + with ie_filepath.open(mode="w") as fp: + idxctx.logger.debug("begin indexing") + try: + signal.signal(signal.SIGINT, sigint_handler) + es_res = es_index( + idxctx.es, + actions, + fp, + idxctx.logger, + idxctx._dbg, + ) + except SigIntException: + idxctx.logger.exception( + "Indexing interrupted by SIGINT, continuing to next tarball" + ) + continue + finally: + # Turn off the SIGINT handler when not indexing. + signal.signal(signal.SIGINT, signal.SIG_IGN) except UnsupportedTarballFormat as e: tb_res = self.emit_error( idxctx.logger.warning, "TB_META_ABSENT", e @@ -512,8 +509,6 @@ def sighup_handler(*args): Audit.create( root=audit, status=doneness, attributes=attributes ) - if lock: - lock.release() try: ie_len = ie_filepath.stat().st_size except FileNotFoundError: diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index c4948aefce..1e3bb0f639 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -92,14 +92,9 @@ class FakeLockRef: def __init__(self, lockpath: Path): self.lock = lockpath - def acquire(self, **kwargs): - self.operations.append( - ( - "acquire", - kwargs.get("exclusive", False), - kwargs.get("wait", True), - ) - ) + def acquire(self, exclusive: bool = False, wait: bool = True): + self.operations.append(("acquire", exclusive, wait)) + return self def release(self): self.operations.append(("release")) @@ -1721,6 +1716,7 @@ def test_compatibility( r1 = t1.get_results(FakeLockRef(t1.lock)) assert r1 == t1.unpacked + FakeLockRef.reset() r2 = t2.get_results(FakeLockRef(t2.lock)) assert r2 == t2.unpacked FakeLockRef.reset() @@ -1819,6 +1815,17 @@ def fake_open(self, flags: str) -> FakeStream: ] assert files == [("open", lockfile, "w+"), ("close", lockfile)] + # No-wait failure + reset() + lock = LockRef(lockfile) + assert files == [("open", lockfile, "w+")] + lockf_fail = OSError(errno.EAGAIN, "Loser") + with pytest.raises(OSError, match="Loser"): + lock.acquire(wait=False) + assert not lock.exclusive + assert not lock.locked + assert locks == [(lockfile, fcntl.LOCK_SH | fcntl.LOCK_NB)] + # Exclusive reset() lock = LockRef(lockfile) @@ -1935,11 +1942,11 @@ def test_lockmanager(self, monkeypatch): assert FakeLockRef.operations == [("acquire", False, False)] assert FakeLockRef.operations == [("acquire", False, False), ("release")] - # keep' option + # keep option FakeLockRef.reset() with LockManager(lockfile) as lock: - assert lock.keep() is lock - still_locked = lock + still_locked = lock.keep() + assert still_locked is lock assert not lock.unlock assert FakeLockRef.operations == [("acquire", False, True)] assert FakeLockRef.operations == [("acquire", False, True)] diff --git a/lib/pbench/test/unit/server/test_indexing_tarballs.py b/lib/pbench/test/unit/server/test_indexing_tarballs.py index ee96f11b86..3eccfc6b7e 100644 --- a/lib/pbench/test/unit/server/test_indexing_tarballs.py +++ b/lib/pbench/test/unit/server/test_indexing_tarballs.py @@ -5,7 +5,7 @@ from pathlib import Path from signal import SIGHUP import time -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional import pytest @@ -16,7 +16,7 @@ OperationCode, PbenchServerConfig, ) -from pbench.server.cache_manager import LockManager, LockRef +from pbench.server.cache_manager import LockManager from pbench.server.database.models.audit import AuditStatus from pbench.server.database.models.datasets import ( Dataset, @@ -266,34 +266,28 @@ def error(self, dataset: Dataset, message: str): __class__.errors[dataset.name] = message -class FakeLockRef: - def __init__(self, lock: Path): +class FakeLockManager: + def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True): """Initialize a mocked lock reference Args: lock: the path of a lock file + exclusive: lock exclusively + wait: wait for lock """ - self.locked = False - self.exclusive = False - self.unlock = True + self.exclusive = exclusive + self.wait = wait - def acquire(self, exclusive: bool = False, wait: bool = True) -> "FakeLockRef": + def __enter__(self) -> "FakeLockManager": """Acquire the lock - Args: - exclusive: lock for exclusive access - wait: [default] wait for lock - Returns: - self reference so acquire can be chained with constructor + self reference for 'as' clause """ - self.locked = True - self.exclusive = exclusive return self - def release(self): + def __exit__(self, *exc): """Release the lock and close the lock file""" - self.locked = False self.exclusive = False def upgrade(self): @@ -326,7 +320,7 @@ def __init__(self, path: Path, resource_id: str, controller: FakeController): self.unpacked = self.cache / self.name self.isolator = controller.path / resource_id - def get_results(self, lock: Union[LockRef, LockManager]): + def get_results(self, lock: LockManager): pass @@ -387,7 +381,7 @@ def mocks(monkeypatch, make_logger): m.setattr("pbench.server.indexing_tarballs.Metadata", FakeMetadata) m.setattr("pbench.server.indexing_tarballs.IndexMap", FakeIndexMap) m.setattr("pbench.server.indexing_tarballs.CacheManager", FakeCacheManager) - m.setattr("pbench.server.indexing_tarballs.LockRef", FakeLockRef) + m.setattr("pbench.server.indexing_tarballs.LockManager", FakeLockManager) m.setattr("pbench.server.indexing_tarballs.Audit", FakeAudit) yield m FakeAudit.reset() From 3063126a44ec718a5ce4bd00c4cd30458fc73f5c Mon Sep 17 00:00:00 2001 From: David Butenhof Date: Tue, 10 Oct 2023 15:35:35 -0400 Subject: [PATCH 14/14] Removed unused mock variable --- lib/pbench/test/unit/server/test_cache_manager.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/lib/pbench/test/unit/server/test_cache_manager.py b/lib/pbench/test/unit/server/test_cache_manager.py index 1e3bb0f639..df3e31a45a 100644 --- a/lib/pbench/test/unit/server/test_cache_manager.py +++ b/lib/pbench/test/unit/server/test_cache_manager.py @@ -1033,11 +1033,7 @@ def test_get_inventory( cache = tmp_path / "mock" / ".cache" unpacked = cache / "ABC" / "dir_name" - get_results_called = False - def fake_results(self, lock: LockRef) -> Path: - nonlocal get_results_called - get_results_called = True self.unpacked = unpacked return self.unpacked