Skip to content

Commit

Permalink
Separate basic LockRef from context manager
Browse files Browse the repository at this point in the history
  • Loading branch information
dbutenhof committed Sep 27, 2023
1 parent f461896 commit cff2e5c
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 93 deletions.
4 changes: 2 additions & 2 deletions lib/pbench/cli/server/tree_manage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pbench.cli.server.options import common_options
from pbench.common.logger import get_pbench_logger
from pbench.server import BadConfig, OperationCode
from pbench.server.cache_manager import CacheManager, LockRef
from pbench.server.cache_manager import CacheManager, LockManager
from pbench.server.database.models.audit import Audit, AuditStatus, AuditType

# Length of time in hours to retain unreferenced cached results data.
Expand Down Expand Up @@ -48,7 +48,7 @@ def reclaim_cache(tree: CacheManager, logger: Logger, lifetime: float = CACHE_LI
window,
)
try:
with LockRef(tarball.lock, exclusive=True, wait=False):
with LockManager(tarball.lock, exclusive=True, wait=False):
try:
audit = Audit.create(
name="reclaim",
Expand Down
87 changes: 62 additions & 25 deletions lib/pbench/server/cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class MetadataError(CacheManagerError):

def __init__(self, tarball: Path, error: Exception):
super().__init__(
f"A problem occurred processing metadata.log from {tarball!s}: {str(error)!r}"
f"A problem occurred processing metadata.log from {tarball}: {str(error)!r}"
)
self.tarball = tarball
self.error = str(error)
Expand Down Expand Up @@ -184,7 +184,7 @@ def make_cache_object(dir_path: Path, path: Path) -> CacheObject:
class LockRef:
"""Keep track of a cache lock passed off to a caller"""

def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True):
def __init__(self, lock: Path):
"""Initialize a lock reference
The lock file is opened in "w+" mode, which is "write update": unlike
Expand All @@ -193,20 +193,25 @@ def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True):
Args:
lock: the path of a lock file
exclusive: lock for exclusive access
wait: wait for lock
"""
self.lock = lock.open("w+")
self.locked = False
self.exclusive = exclusive
self.unlock = True
self.wait = wait
self.exclusive = False

def acquire(self, exclusive: bool = False, wait: bool = True) -> "LockRef":
"""Acquire the lock
Args:
exclusive: lock for exclusive access
wait: wait for lock
def acquire(self) -> "LockRef":
"""Acquire the lock"""
Returns:
The lockref, so this can be chained with the constructor
"""

cmd = fcntl.LOCK_EX if self.exclusive else fcntl.LOCK_SH
if not self.wait:
self.exclusive = exclusive
cmd = fcntl.LOCK_EX if exclusive else fcntl.LOCK_SH
if not wait:
cmd |= fcntl.LOCK_NB
fcntl.lockf(self.lock, cmd)
self.locked = True
Expand All @@ -215,20 +220,17 @@ def acquire(self) -> "LockRef":
def release(self):
"""Release the lock and close the lock file"""

exception = None
if not self.locked:
return
try:
fcntl.lockf(self.lock, fcntl.LOCK_UN)
self.lock.close()
self.locked = False
self.exclusive = False
except Exception as e:
exception = e
finally:
# Release our reference to the lock file so that the
# object can be reclaimed.
self.locked = False
self.exclusive = False
self.lock = None
if exception:
raise exception

def upgrade(self):
"""Upgrade a shared lock to exclusive"""
Expand All @@ -242,19 +244,44 @@ def downgrade(self):
fcntl.lockf(self.lock, fcntl.LOCK_SH)
self.exclusive = False

def keep(self) -> "LockRef":

class LockManager:
def __init__(self, lock: Path, exclusive: bool = False, wait: bool = True):
self.lock = LockRef(lock)
self.exclusive = exclusive
self.wait = wait
self.unlock = True

def keep(self) -> "LockManager":
"""Tell the context manager not to unlock on exit"""
self.unlock = False
return self

def __enter__(self) -> "LockRef":
def release(self):
"""Release manually if necessary"""
self.lock.release()

def upgrade(self):
"""Upgrade a shared lock to exclusive"""
self.lock.upgrade()

def downgrade(self):
"""Downgrade an exclusive lock to shared"""
self.lock.downgrade()

def __enter__(self) -> "LockManager":
"""Enter a lock context manager by acquiring the lock"""
return self.acquire()
self.lock.acquire(exclusive=self.exclusive, wait=self.wait)
return self

def __exit__(self, *exc):
"""Exit a lock context manager by releasing the lock"""
"""Exit a lock context manager by releasing the lock
This does nothing if "unlock" has been cleared, meaning we want the
lock to persist beyond the context manager scope.
"""
if self.unlock:
self.release()
self.lock.release()


class Inventory:
Expand Down Expand Up @@ -771,12 +798,22 @@ def stream(self, path: str) -> Inventory:
Args:
path: Relative path of a regular file within a tarball.
On failure, an exception is raised and the cache lock is released; on
success, returns an Inventory object which implicitly transfers
ownership and management of the cache lock to the caller. When done
with the inventory's file stream, the caller must close the Inventory
object to release the file stream and the cache lock.
Raises:
CacheExtractBadPath: the path does not match a regular file within
the tarball.
Returns:
An Inventory object encapsulating the file stream and the cache
lock. The caller is reponsible for closing the Inventory!
lock.
"""

with LockRef(self.lock) as lock:
with LockManager(self.lock) as lock:
if not self.unpacked:
lock.upgrade()
self.cache_create()
Expand Down
2 changes: 1 addition & 1 deletion lib/pbench/server/indexing_tarballs.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ def sighup_handler(*args):
tarobj = self.cache_manager.find_dataset(
dataset.resource_id
)
lock = LockRef(tarobj.lock, exclusive=True).acquire()
lock = LockRef(tarobj.lock).acquire(exclusive=True)
tarobj.cache_create()
lock.downgrade()
except Exception as e:
Expand Down
Loading

0 comments on commit cff2e5c

Please sign in to comment.