Skip to content

Commit

Permalink
Some refactoring
Browse files Browse the repository at this point in the history
1. Fix Inventory.close() to always close the stream.
2. Make cache load more transparent by upgrading lock if we need to unpack.
  • Loading branch information
dbutenhof committed Oct 2, 2023
1 parent 918bb0e commit 73d9688
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 83 deletions.
61 changes: 39 additions & 22 deletions lib/pbench/server/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,16 +334,22 @@ def close(self):
while self.subproc.stderr.read(4096):
pass
self.subproc.wait(60.0)
finally:
except Exception as e:
# Release our reference to the subprocess.Popen object so that the
# object can be reclaimed.
self.subproc = None
exception = e
if self.lock:
try:
self.lock.release()
except Exception as e:
exception = e
self.stream.close()

# NOTE: if both subprocess cleanup and unlock fail with exceptions, we
# raise the latter, and the former will be ignored. In practice, that's
# not a problem as we only construct an Inventory with a subprocess
# reference for extract, which doesn't need to lock a cache directory.
if exception:
raise exception

Expand Down Expand Up @@ -585,8 +591,10 @@ def create(
return cls(destination, resource_id, controller)

def cache_map(self, dir_path: Path):
"""Builds Hierarchy structure of a Directory in a Dictionary
Format.
"""Build hierarchical representation of results tree
NOTE: this structure isn't removed when we release the cache, as the
data remains valid.
Args:
dir_path: root directory
Expand Down Expand Up @@ -614,8 +622,7 @@ def cache_map(self, dir_path: Path):

@staticmethod
def traverse_cmap(path: Path, cachemap: CacheMap) -> CacheMapEntry:
"""Sequentially traverses the cachemap to find the leaf of a
relative path reference
"""Locate a path in the cache map
Args:
path: relative path of the subdirectory/file
Expand Down Expand Up @@ -648,11 +655,10 @@ def traverse_cmap(path: Path, cachemap: CacheMap) -> CacheMapEntry:
def get_info(self, path: Path) -> JSONOBJECT:
"""Returns the details of the given file/directory in dict format
NOTE: This requires a call to the cache_map method to build a map that
can be traversed. Currently, this is done only on unpack, and isn't
useful except within the `pbench-index` process. This map needs to
either be built dynamically (potentially expensive) or persisted in
SQL or perhaps Redis.
NOTE: If the cache manager doesn't already have a cache map for the
current Tarball, we'll unpack it here; however as the cache map isn't
dependent on the unpacked results tree, we immediately release the
cache lock.
Args:
path: path of the file/subdirectory
Expand Down Expand Up @@ -681,6 +687,10 @@ def get_info(self, path: Path) -> JSONOBJECT:
" we expect relative path to the root directory."
)

if not self.cachemap:
with LockManager(self.lock) as lock:
self.get_results(lock)

c_map = self.traverse_cmap(path, self.cachemap)
children = c_map["children"] if "children" in c_map else {}
fd_info = c_map["details"].__dict__.copy()
Expand All @@ -704,6 +714,11 @@ def get_info(self, path: Path) -> JSONOBJECT:
def extract(tarball_path: Path, path: str) -> Inventory:
"""Returns a file stream for a file within a tarball
NOTE: This should generally be used only within the INTAKE path before
it's practical to load the cache with a fully unpacked tree. The
extracted file is not cached and therefore each reference will repeat
the tar file extraction.
Args:
tarball_path: absolute path of the tarball
path: relative path within the tarball
Expand Down Expand Up @@ -814,15 +829,9 @@ def stream(self, path: str) -> Inventory:
"""

with LockManager(self.lock) as lock:
if not self.unpacked:
lock.upgrade()
self.cache_create()
lock.downgrade()

artifact: Path = self.unpacked / path
artifact: Path = self.get_results(lock) / path
if not artifact.is_file():
raise CacheExtractBadPath(self.tarball_path, path)
self.last_ref.touch(exist_ok=True)
return Inventory(artifact.open("rb"), lock=lock.keep())

def get_inventory(self, path: str) -> Optional[JSONOBJECT]:
Expand Down Expand Up @@ -906,16 +915,22 @@ def subprocess_run(
f"{cmd[0]} exited with status {process.returncode}: {process.stderr.strip()!r}",
)

def cache_create(self):
def get_results(self, lock: Union[LockManager, LockRef]) -> Path:
"""Unpack a tarball into a temporary directory tree
Unpack the tarball into a temporary cache directory named with the
tarball's resource_id (MD5).
Make sure that the dataset results are unpacked into a cache tree. The
location of the unpacked tree is in self.unpacked and is also returned
direct to the caller.
Args:
lock: A lock reference (or context manager) in shared lock state
This method must be called while holding an exclusive cache lock.
Returns:
the root Path of the unpacked directory tree
"""

if not self.unpacked:
lock.upgrade()
audit = None
error = None
try:
Expand Down Expand Up @@ -944,7 +959,6 @@ def cache_create(self):
find_command, self.cache, TarballModeChangeError, self.cache
)
self.unpacked = self.cache / self.name
self.last_ref.touch(exist_ok=True)
self.cache_map(self.unpacked)
except Exception as e:
error = str(e)
Expand All @@ -957,6 +971,9 @@ def cache_create(self):
status=AuditStatus.FAILURE if error else AuditStatus.SUCCESS,
attributes=attributes,
)
lock.downgrade()
self.last_ref.touch(exist_ok=True)
return self.unpacked

def cache_delete(self):
"""Remove the unpacked tarball directory and all contents.
Expand Down
5 changes: 2 additions & 3 deletions lib/pbench/server/indexing_tarballs.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,9 +370,8 @@ def sighup_handler(*args):
tarobj = self.cache_manager.find_dataset(
dataset.resource_id
)
lock = LockRef(tarobj.lock).acquire(exclusive=True)
tarobj.cache_create()
lock.downgrade()
lock = LockRef(tarobj.lock).acquire()
tarobj.get_results(lock)
except Exception as e:
self.sync.error(
dataset,
Expand Down
Loading

0 comments on commit 73d9688

Please sign in to comment.