Skip to content

Commit

Permalink
Add coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
pierre.delaunay committed Jul 8, 2024
1 parent 3ce28f7 commit 575ba9f
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 25 deletions.
13 changes: 12 additions & 1 deletion .github/workflows/tests_unit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,15 @@ jobs:
- name: tests
run: |
source $(poetry env info -p)/bin/activate
pytest --ignore=tests/integration tests/
coverage run --source=milabench -m pytest --ignore=tests/integration tests/
coverage report -m
coverage xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
flags: unittests
env_vars: PLATFORM,PYTHON
name: codecov-umbrella
fail_ci_if_error: false
75 changes: 58 additions & 17 deletions milabench/commands/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ..alt_async import destroy, run
from ..metadata import machine_metadata
from ..structs import BenchLogEntry
from ..syslog import syslog


async def execute(pack, *args, cwd=None, env={}, external=False, **kwargs):
Expand Down Expand Up @@ -41,47 +42,87 @@ async def execute(pack, *args, cwd=None, env={}, external=False, **kwargs):
)


async def force_terminate(pack, delay):
await asyncio.sleep(delay)
async def force_terminate_now(pack, delay):
for proc in pack.processes:
ret = proc.poll()

if ret is None:
await pack.message(
f"Terminating process because it ran for longer than {delay} seconds."
)
destroy(proc)


async def force_terminate(pack, delay):
await asyncio.sleep(delay)
force_terminate_now(pack, delay)


async def execute_command(
command, phase="run", timeout=False, timeout_delay=600, **kwargs
):
"""Execute all the commands and return the aggregated results"""
packs = {}
coro = []

for pack in command.packs():
pack.phase = phase

max_delay = timeout_delay

with process_cleaner() as warden:
for pack, argv, _kwargs in command.commands():
await pack.send(event="config", data=pack.config)
await pack.send(event="meta", data=machine_metadata(pack))

delay = None
if timeout:
delay = pack.config.get("max_duration", timeout_delay)
max_delay = max(max_delay, delay)

fut = execute(pack, *argv, **{**_kwargs, **kwargs})
fut = asyncio.create_task(execute(pack, *argv, **{**_kwargs, **kwargs}))
packs[fut] = pack
coro.append(fut)
warden.extend(pack.processes)

if timeout:
delay = pack.config.get("max_duration", timeout_delay)

try:
return await asyncio.wait_for(asyncio.gather(*coro), timeout=delay)
all_done = []

for wait_count in range(3):
# wait_count == 0: wait for the initial timeout
# wait_count == 1: wait for the grace period for the process to end
# wait_count == 2: unacceptable
# 1. Cancel the coroutine as the process is not responding
# 2. Send an error to the pack
done, coro = await asyncio.wait(coro, timeout=max_delay)
all_done.extend(done)

if len(coro) == 0:
# all done exit
break

if wait_count == 2:
syslog("{1} process(es) are still alive after grace period", len(coro))
syslog("Canceling the coroutines")
for timedout in coro:
timedout.cancel()
pack = packs[timedout]
pack.send(event="error", data={
"type": "TimeoutError",
"message": "Survived after term & kill signal"
})
all_done.extend(coro)
break

except TimeoutError:
await force_terminate(pack, delay)
return [-1 for _ in coro]
except asyncio.TimeoutError:
await force_terminate(pack, delay)
return [-1 for _ in coro]

return await asyncio.gather(*coro)

# Tasks timeout
for timedout in coro:
# kill the underlying process which should force the coro to
# return on next wait
pack = packs[timedout]
await force_terminate_now(pack, max_delay)

# Grace period
max_delay = 10

return all_done
else:
return await asyncio.gather(*coro)
17 changes: 12 additions & 5 deletions milabench/sizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,12 @@ def scale_argv(pack, argv):
sizer = batch_sizer()

system = system_global.get()

capacity = system.get("gpu", dict()).get("capacity")

return sizer.argv(pack, capacity, argv)

if system:
capacity = system.get("gpu", dict()).get("capacity")
return sizer.argv(pack, capacity, argv)
else:
return argv


class MemoryUsageExtractor(ValidationLayer):
Expand Down Expand Up @@ -313,7 +315,12 @@ def report(self, *args):


def new_argument_resolver(pack):
context = deepcopy(system_global.get())
system_config = system_global.get()
if system_config is None:
system_config = {}

context = deepcopy(system_config)

arch = context.get("arch", "cpu")

if hasattr(pack, "config"):
Expand Down
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pytest-cov = "^3.0.0"
Sphinx = "^4.5.0"
sphinx-rtd-theme = "^1.0.0"
pytest-regressions = "^2.4.2"
coverage = "^7.5.4"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
61 changes: 61 additions & 0 deletions tests/test_executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,64 @@ def test_void_executor():

for _ in proceed(plan.execute()):
pass


def await_now(function):
loop = asyncio.get_event_loop()
return loop.run_until_complete(function)


from argparse import Namespace

class MockPack:
config = {
"max_duration": 1,
"name": "Mock"
}
processes = []

dirs = Namespace(**{
"code": os.getcwd()
})

def full_env(self, *args, **kwargs):
return {}

async def send(self, **kwargs):
print(kwargs)

async def message(self, msg):
print(msg)


class Commands:
def __init__(self, time) -> None:
self.time = time

def packs(self):
return []

def commands(self):
yield MockPack(), ["sleep", str(self.time)], {}


def test_execute_command_timeout():
from milabench.commands.executors import execute_command

future = execute_command(Commands(10), timeout=True, timeout_delay=1)

for msg in proceed(future):
print(msg)



def test_execute_command():
from milabench.commands.executors import execute_command

future = execute_command(Commands(0), timeout=True, timeout_delay=1)
messages = []
for msg in proceed(future):
messages.append(msg)

assert len(messages) == 2
assert messages[-1].data["return_code"] == 0

0 comments on commit 575ba9f

Please sign in to comment.