Skip to content

Commit

Permalink
Add locking for install & prepare in case bench is installed in share…
Browse files Browse the repository at this point in the history
…d location
  • Loading branch information
Pierre Delaunay committed Jun 19, 2024
1 parent 8cf1efa commit 75bc326
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 64 deletions.
6 changes: 6 additions & 0 deletions milabench/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
config_global = contextvars.ContextVar("config", default=None)



def get_base_directory():
config = config_global.get()
return config["dirs"]["base"]


def relative_to(pth, cwd):
pth = XPath(pth).expanduser()
if not pth.is_absolute():
Expand Down
15 changes: 12 additions & 3 deletions milabench/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
from copy import deepcopy

from voir.instruments.gpu import get_gpu_info
from filelock import FileLock

from .capability import is_system_capable
from .commands import NJobs, PerGPU
from .config import get_base_directory
from .fs import XPath
from .pack import Package
from .remote import (
Expand All @@ -24,6 +26,12 @@
planning_methods = {}


def phase_lock(phase):
folder = XPath(get_base_directory()) / "extra" / f"{phase}.lock"
folder.parent.mkdir(exist_ok=True)
return FileLock(folder, timeout=3600 * 2)


async def aprint(pack, msg):
await pack.send(event="line", data=msg, pipe="stdout")

Expand Down Expand Up @@ -135,7 +143,8 @@ async def do_install(self):
remote_task = asyncio.create_task(remote_plan.execute())

# do the installation step
await self.do_phase("install", remote_task, "checked_install")
with phase_lock("install"):
await self.do_phase("install", remote_task, "checked_install")

async def do_prepare(self):
setup = self.setup_pack()
Expand All @@ -145,14 +154,14 @@ async def do_prepare(self):
remote_plan = milabench_remote_prepare(setup, run_for="main")
remote_task = asyncio.create_task(remote_plan.execute())
await asyncio.wait([remote_task])

return

elif is_main_local(setup) and is_multinode(setup):
remote_plan = milabench_remote_prepare(setup, run_for="worker")
remote_task = asyncio.create_task(remote_plan.execute())

await self.do_phase("prepare", remote_task, "prepare")
with phase_lock("prepare"):
await self.do_phase("prepare", remote_task, "prepare")

async def do_run(self, repeat=1):
setup = self.setup_pack()
Expand Down
Loading

0 comments on commit 75bc326

Please sign in to comment.