Skip to content

Commit

Permalink
Quick cache manager
Browse files Browse the repository at this point in the history
PBENCH-1249

On large datasets, our direct tarball extraction method can time out the API
call. Unlike on a long intake, there is no persistent artifact so a retry will
always time out as well. This applies to any `get_inventory` call, and
therefore to the `/inventory`, `/visualize`, and `/compare` APIs; and given
the central importance of those APIs for our Server 1.0 story, that's not an
acceptable failure mode.

This PR mitigates that problem with a "compromise" partial cache manager,
leveraging the existing `unpack` method but adding a file lock to manage
shared access. The idea is that any consumer of tarball contents (including
the indexer) will unpack the entire tarball, but leave a "last reference"
timestamp. A periodic timer service will check the cache unpack timestamps,
and delete the unpack directories which aren't currently locked and which
haven't been referenced for longer than a set time period.

__NOTE__: I'm posting a draft mostly for coverage data after a lot of drift
in the cache manager unit tests, to determine whether more work is necessary.
The "last reference" and reclaim mechanism isn't yet implemented, though that
should be the "easy part" now that I've got the server code working.
  • Loading branch information
dbutenhof committed Sep 7, 2023
1 parent ee4e37d commit 1fe5660
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 95 deletions.
9 changes: 7 additions & 2 deletions lib/pbench/server/api/resources/datasets_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
ParamType,
Schema,
)
from pbench.server.cache_manager import CacheManager, CacheType, TarballNotFound
from pbench.server.cache_manager import (
CacheExtractBadPath,
CacheManager,
CacheType,
TarballNotFound,
)


class DatasetsInventory(ApiBase):
Expand Down Expand Up @@ -63,7 +68,7 @@ def _get(
cache_m = CacheManager(self.config, current_app.logger)
try:
file_info = cache_m.get_inventory(dataset.resource_id, target)
except TarballNotFound as e:
except (TarballNotFound, CacheExtractBadPath) as e:
raise APIAbort(HTTPStatus.NOT_FOUND, str(e))

if file_info["type"] != CacheType.FILE:
Expand Down
138 changes: 83 additions & 55 deletions lib/pbench/server/cache_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import deque
from dataclasses import dataclass
from enum import auto, Enum
import fcntl
from logging import Logger
from pathlib import Path
import shlex
Expand Down Expand Up @@ -193,25 +194,32 @@ def make_cache_object(dir_path: Path, path: Path) -> CacheObject:


class Inventory:
"""Encapsulate the file stream and subprocess.Popen object
"""Encapsulate the file stream and cache lock management
This encapsulation allows cleaner downstream handling, so that we can close
both the extracted file stream and the Popen object itself when we're done.
This eliminates interference with later operations.
both the extracted file stream and unlock the dataset cache after the file
reference is complete. In APIs the Inventory close is done by the Flask
infrastructure after the response handling is done.
"""

def __init__(self, stream: IO[bytes], subproc: Optional[subprocess.Popen] = None):
def __init__(
self,
stream: IO[bytes],
lock: Optional[IO[bytes]] = None,
subproc: Optional[subprocess.Popen] = None,
):
"""Construct an instance to track extracted inventory
This encapsulates many byte stream operations so that it can be used
as if it were a byte stream.
Args:
stream: the data stream of a specific tarball member
subproc: the subprocess producing the stream, if any
lock: the file reference for the cache lock file
"""
self.subproc = subproc
self.stream = stream
self.lock = lock
self.subproc = subproc

def close(self):
"""Close the byte stream and clean up the Popen object"""
Expand All @@ -236,11 +244,14 @@ def close(self):
# Release our reference to the subprocess.Popen object so that the
# object can be reclaimed.
self.subproc = None

# Explicitly close the stream, in case there never was an associated
# subprocess. (If there was an associated subprocess, the streams are
# now empty, and we'll assume that they are closed when the Popen
# object is reclaimed.)
if self.lock:
try:
fcntl.lockf(self.lock, fcntl.LOCK_UN)
self.lock.close()
finally:
# Release our reference to the lock file so that the
# object can be reclaimed.
self.lock = None
self.stream.close()

def getbuffer(self):
Expand All @@ -261,7 +272,7 @@ def seek(self, *args, **kwargs) -> int:

def __repr__(self) -> str:
"""Return a string representation"""
return f"<Stream {self.stream} from {self.subproc}>"
return f"<Stream {self.stream} from {self.lock.name if self.lock else self.subproc if self.subproc else None}>"

def __iter__(self):
"""Allow iterating through lines in the buffer"""
Expand Down Expand Up @@ -628,7 +639,7 @@ def extract(tarball_path: Path, path: str) -> Inventory:
# containing the extracted file to our caller, and return the Popen
# object so that we can clean it up when the Inventory object is closed.
if not tarproc.returncode:
return Inventory(tarproc.stdout, tarproc)
return Inventory(tarproc.stdout, subproc=tarproc)

# The tar command was invoked successfully (otherwise, the Popen()
# constructor would have raised an exception), but it exited with
Expand All @@ -646,6 +657,41 @@ def extract(tarball_path: Path, path: str) -> Inventory:
tarball_path, f"Unexpected error from {tar_path}: {error_text!r}"
)

def stream(self, path: str) -> Inventory:
"""Return a cached inventory file as a binary stream
Args:
path: Relative path of a regular file within a tarball.
Returns:
An Inventory object encapsulating the file stream and the cache
lock. The caller is reponsible for closing the Inventory!
"""
self.unpack()

# NOTE: the cache is unlocked here. That's "probably OK" as the reclaim
# is based on the last modified timestamp of the lock file and only
# runs periodically.
#
# TODO: Can we convert the lock from EXCLUSIVE to SHARED? If so, that
# would be ideal.

lock = None
try:
lock = (self.cache / "lock").open("rb", buffering=0)
fcntl.lockf(lock, fcntl.LOCK_SH)
artifact: Path = self.unpacked / path
self.controller.logger.info("path {}: stat {}", artifact, artifact.exists())
if not artifact.is_file():
raise CacheExtractBadPath(self.tarball_path, path)
(self.cache / "last_ref").touch(exist_ok=True)
return Inventory(artifact.open("rb"), lock=lock)
except Exception as e:
if lock:
fcntl.lockf(lock, fcntl.LOCK_UN)
lock.close()
raise e

def get_inventory(self, path: str) -> Optional[JSONOBJECT]:
"""Access the file stream of a tarball member file.
Expand All @@ -659,12 +705,11 @@ def get_inventory(self, path: str) -> Optional[JSONOBJECT]:
info = {
"name": self.name,
"type": CacheType.FILE,
"stream": Inventory(self.tarball_path.open("rb"), None),
"stream": Inventory(self.tarball_path.open("rb")),
}
else:
file_path = f"{self.name}/{path}"
stream = Tarball.extract(self.tarball_path, file_path)
info = {"name": file_path, "type": CacheType.FILE, "stream": stream}
stream = self.stream(path)
info = {"name": path, "type": CacheType.FILE, "stream": stream}

return info

Expand Down Expand Up @@ -747,23 +792,25 @@ def unpack(self):
here and pass to the indexer (/srv/pbench/.cache/<resource_id>) and
the actual unpacked root (/srv/pbench/.cache/<resource_id>/<name>).
"""
self.cache.mkdir(parents=True)

try:
tar_command = "tar -x --no-same-owner --delay-directory-restore "
tar_command += f"--force-local --file='{str(self.tarball_path)}'"
self.subprocess_run(
tar_command, self.cache, TarballUnpackError, self.tarball_path
)

find_command = "find . ( -type d -exec chmod ugo+rx {} + ) -o ( -type f -exec chmod ugo+r {} + )"
self.subprocess_run(
find_command, self.cache, TarballModeChangeError, self.cache
)
except Exception:
shutil.rmtree(self.cache, ignore_errors=True)
raise
self.unpacked = self.cache / self.name
if not self.cache.exists():
self.cache.mkdir(exist_ok=True)
with (self.cache / "lock").open("wb", buffering=0) as lock:
fcntl.lockf(lock, fcntl.LOCK_EX)
try:
if not self.unpacked:
tar_command = "tar -x --no-same-owner --delay-directory-restore "
tar_command += f"--force-local --file='{str(self.tarball_path)}'"
self.subprocess_run(
tar_command, self.cache, TarballUnpackError, self.tarball_path
)
find_command = "find . ( -type d -exec chmod ugo+rx {} + ) -o ( -type f -exec chmod ugo+r {} + )"
self.subprocess_run(
find_command, self.cache, TarballModeChangeError, self.cache
)
self.unpacked = self.cache / self.name
finally:
fcntl.lockf(lock, fcntl.LOCK_UN)
self.cache_map(self.unpacked)

def uncache(self):
Expand Down Expand Up @@ -901,24 +948,6 @@ def create_tarball(self, tarfile_path: Path) -> Tarball:
self.tarballs[tarball.name] = tarball
return tarball

def unpack(self, dataset_id: str):
"""Unpack a tarball into a temporary cache directory.
Args:
dataset_id: Resource ID of the dataset to unpack
"""
tarball = self.datasets[dataset_id]
tarball.unpack()

def uncache(self, dataset_id: str):
"""Remove the cached unpack directory.
Args:
dataset_id: Resource ID of dataset to remove
"""
tarball = self.datasets[dataset_id]
tarball.uncache()

def delete(self, dataset_id: str):
"""Delete a dataset tarball and remove it from the controller
Expand Down Expand Up @@ -1209,7 +1238,7 @@ def unpack(self, dataset_id: str) -> Tarball:
The tarball object
"""
tarball = self.find_dataset(dataset_id)
tarball.controller.unpack(dataset_id)
tarball.unpack()
return tarball

def get_info(self, dataset_id: str, path: Path) -> dict[str, Any]:
Expand Down Expand Up @@ -1252,9 +1281,8 @@ def uncache(self, dataset_id: str):
dataset_id: Dataset resource ID to "uncache"
"""
tarball = self.find_dataset(dataset_id)
controller = tarball.controller
controller.uncache(dataset_id)
self._clean_empties(controller.name)
tarball.uncache()
self._clean_empties(tarball.controller.name)

def delete(self, dataset_id: str):
"""Delete the dataset as well as unpacked artifacts.
Expand Down
Loading

0 comments on commit 1fe5660

Please sign in to comment.