Skip to content

Commit

Permalink
Fix CPU default
Browse files Browse the repository at this point in the history
  • Loading branch information
pierre.delaunay committed Jun 26, 2024
1 parent 78bfdfc commit f044a57
Show file tree
Hide file tree
Showing 19 changed files with 132 additions and 82 deletions.
4 changes: 2 additions & 2 deletions benchmate/benchmate/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ def monitor_fn():
}
for gpu in get_gpu_info()["gpus"].values()
}
return {"task": "train", "gpudata": data, "time": time.time(), "units": "s"}
return {"task": "train", "gpudata": data, "time": time.time()}

monitor = CustomMonitor(0.5, monitor_fn)

def log(data):
nonlocal monitor

if data_file is not None:
data["t"] = time.time()
data["time"] = time.time()
print(json.dumps(data), file=data_file)

while not monitor.results.empty():
Expand Down
6 changes: 3 additions & 3 deletions milabench/_version.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""This file is generated, do not modify"""

__tag__ = "v0.1.0-30-g94b27a71"
__commit__ = "94b27a71145d3ba754a2713aeca60e5a28be4bc5"
__date__ = "2024-06-25 13:49:52 -0400"
__tag__ = "v0.1.0-32-ge9e52501"
__commit__ = "e9e52501ad92d2ee2dac97e66f601a0458404986"
__date__ = "2024-06-26 02:37:50 -0400"
36 changes: 18 additions & 18 deletions milabench/alt_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from collections import deque
from functools import wraps

from benchmate.warden import process_cleaner, destroy
from benchmate.warden import destroy
from voir.proc import run as voir_run


Expand Down Expand Up @@ -166,29 +166,29 @@ async def wrapped(*args, **kwargs):
return wrapped




@feedback_runner
def run(argv, setsid=None, process_accumulator=None, info={}, **kwargs):
with process_cleaner() as processes:
if setsid:
kwargs["preexec_fn"] = os.setsid

mx = voir_run(argv, info=info, **kwargs, timeout=0)
processes.add_process(*mx.processes)
if setsid:
kwargs["preexec_fn"] = os.setsid

if process_accumulator is not None:
process_accumulator.extend(mx.processes)
mx = voir_run(argv, info=info, **kwargs, timeout=0)

if process_accumulator is not None:
process_accumulator.extend(mx.processes)

if setsid:
for proc in mx.processes:
proc.did_setsid = True
if setsid:
for proc in mx.processes:
proc.did_setsid = True

loop = asyncio.get_running_loop()
loop._multiplexers.append(mx)
loop = asyncio.get_running_loop()
loop._multiplexers.append(mx)

for entry in mx:
if entry and entry.event == "stop":
destroy(*mx.processes)
yield entry
for entry in mx:
if entry and entry.event == "stop":
destroy(*mx.processes)
yield entry


def proceed(coro):
Expand Down
2 changes: 1 addition & 1 deletion milabench/cli/compare.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,6 @@ def cli_compare(args=None):

for run in runs:
all_data = _read_reports(run.path)
run.summary = make_summary(all_data.values())
run.summary = make_summary(all_data)

compare(runs, args.last, args.metric, args.stat)
2 changes: 1 addition & 1 deletion milabench/cli/gather.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def gather_cli(args=None):
data = []
for run in runs:
reports = _read_reports(run)
summary = make_summary(reports.values(), query=query)
summary = make_summary(reports, query=query)
df = make_dataframe(summary, None, None, query=query)

name = run.split("/")[-1]
Expand Down
2 changes: 1 addition & 1 deletion milabench/cli/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def cli_report(args=None):
reports = None
if args.runs:
reports = _read_reports(*args.runs)
summary = make_summary(reports.values())
summary = make_summary(reports)

if args.config:
from milabench.common import arguments as multipack_args
Expand Down
2 changes: 1 addition & 1 deletion milabench/cli/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def cli_run(args=None):
reports = _read_reports(*runs)
assert len(reports) != 0, "No reports found"

summary = make_summary(reports.values())
summary = make_summary(reports)
assert len(summary) != 0, "No summaries"

make_report(
Expand Down
2 changes: 1 addition & 1 deletion milabench/cli/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def cli_summary(args=None):
args = arguments()

all_data = _read_reports(*args.runs)
summary = make_summary(all_data.values())
summary = make_summary(all_data)

if args.out is not None:
with open(args.out, "w") as file:
Expand Down
32 changes: 18 additions & 14 deletions milabench/commands/executors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import os

from benchmate.warden import process_cleaner

from ..alt_async import destroy, run
from ..metadata import machine_metadata
from ..structs import BenchLogEntry
Expand Down Expand Up @@ -60,21 +62,23 @@ async def execute_command(
pack.phase = phase

timeout_tasks = []
for pack, argv, _kwargs in command.commands():
await pack.send(event="config", data=pack.config)
await pack.send(event="meta", data=machine_metadata(pack))
with process_cleaner() as warden:
for pack, argv, _kwargs in command.commands():
await pack.send(event="config", data=pack.config)
await pack.send(event="meta", data=machine_metadata(pack))

fut = execute(pack, *argv, **{**_kwargs, **kwargs})
coro.append(fut)
fut = execute(pack, *argv, **{**_kwargs, **kwargs})
coro.append(fut)
warden.add_process(*pack.processes)

if timeout:
delay = pack.config.get("max_duration", timeout_delay)
timeout_task = asyncio.create_task(force_terminate(pack, delay))
timeout_tasks.append(timeout_task)
if timeout:
delay = pack.config.get("max_duration", timeout_delay)
timeout_task = asyncio.create_task(force_terminate(pack, delay))
timeout_tasks.append(timeout_task)

results = await asyncio.gather(*coro)
results = await asyncio.gather(*coro)

if timeout:
for task in timeout_tasks:
task.cancel()
return results
if timeout:
for task in timeout_tasks:
task.cancel()
return results
21 changes: 13 additions & 8 deletions milabench/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,10 @@ def _parse_report(pth):
traceback.print_exc()
bad_lines += 1

if good_lines == 0:
if good_lines == 0 and bad_lines == 0:
print(f"Empty file {pth}")

if good_lines == 0 and bad_lines > 0:
print(f"Unknow format for file {pth}")

return data
Expand All @@ -300,12 +303,14 @@ def _read_reports(*runs):
def _error_report(reports):
out = {}
for r, data in reports.items():
agg = aggregate(data)
if not agg:
continue
(success,) = agg["data"]["success"]
if not success:
out[r] = [line for line in data if "#stdout" in line or "#stderr" in line]
try:
agg = aggregate(data)
(success,) = agg["data"]["success"]
if not success:
out[r] = [line for line in data if "#stdout" in line or "#stderr" in line]
except:
pass

return out


Expand Down Expand Up @@ -362,7 +367,7 @@ def _short_make_report(runs, config):

if runs:
reports = _read_reports(*runs)
summary = make_summary(reports.values())
summary = make_summary(reports)

if config:
config = _get_multipack(CommonArguments(config), return_config=True)
Expand Down
13 changes: 9 additions & 4 deletions milabench/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@


config_global = contextvars.ContextVar("config", default=None)
execution_count = contextvars.ContextVar("count", default=0)
execution_count = (0, 0)


def set_run_count(total):
execution_count.set(total)
def set_run_count(total_run, total_bench):
global execution_count
execution_count = (total_run, total_bench)


def get_run_count():
return execution_count.get()
return execution_count[0]


def get_bench_count():
return execution_count[1]


def get_base_folder():
Expand Down
23 changes: 15 additions & 8 deletions milabench/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,15 @@ def log(self, entry):
self.file(entry).write(f"{j}\n")


def new_progress_bar():
progress = Progress(
BarColumn(),
TextColumn("({task.completed}/{task.total})"),
)
progress._task = progress.add_task("progress")
return progress


class DashFormatter(BaseLogger):
def __init__(self):
self.panel = Panel("")
Expand All @@ -230,21 +239,15 @@ def __init__(self):
self.early_stop = {}
# Limit the number of rows to avoid too much clutering
# This is a soft limit, it only prunes finished runs
self.max_rows = 8
self.max_rows = 2
self.prune_delay = 60
self.current = 0

def _get_global_progress_bar(self):
progress = self.rows.get("GLOBAL")
if progress is not None:
return progress["progress"]

progress = Progress(
BarColumn(),
TimeRemainingColumn(),
TextColumn("({task.completed}/{task.total})"),
)
progress._task = progress.add_task("progress")
progress = new_progress_bar()
progress.update(progress._task, completed=self.current, total=get_run_count())
self.rows["GLOBAL"] = {"progress": progress}
return progress
Expand All @@ -267,12 +270,16 @@ def refresh(self):
self.live.update(self.make_table())

def start(self):
self._update_global(0)
self.live.__enter__()

def end(self):
self.live.__exit__(None, None, None)

def __call__(self, entry):
if get_run_count():
self._get_global_progress_bar()

event = entry.event
data = entry.data
tag = entry.tag
Expand Down
18 changes: 11 additions & 7 deletions milabench/multi.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,8 @@ async def do_run(self, repeat=1):
return

assert is_main_local(setup), "Running benchmarks only works on the main node"

set_run_count(await self.count_runs(repeat))

await self.count_runs(repeat)

for index in range(repeat):
for pack in self.packs.values():
try:
Expand Down Expand Up @@ -280,16 +279,21 @@ async def do_pin(
)

async def count_runs(self, repeat):
acc = 0
total_run = 0
total_bench = 0
for index in range(repeat):
for pack in self.packs.values():
if not await is_system_capable(pack):
continue

exec_plan = make_execution_plan(pack, index, repeat)

total_bench += 1

if isinstance(exec_plan, PerGPU):
acc += len(exec_plan.devices)
total_run += len(exec_plan.devices)
else:
acc += 1
return acc
total_run += 1

set_run_count(total_run, total_bench)
return total_run
11 changes: 8 additions & 3 deletions milabench/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,18 @@ def _report_pergpu(entries, measure="50"):

def make_dataframe(summary, compare=None, weights=None, query=None):
if weights is not None:
# We overriden the config
# We've overriden the config
required = weights.keys()

for key in required:
if key not in summary:
print(f"Missing benchmark {key}")
summary[key] = {"name": key, "n": 0, "successes": 0, "failures": 0}
summary[key] = {
"name": key,
"n": 0,
"successes": 0,
"failures": 0,
"enabled": weights[key]["enabled"]
}

if weights is None:
weights = dict()
Expand Down
14 changes: 12 additions & 2 deletions milabench/summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import numpy as np

from .utils import error_guard
from .syslog import syslog


@error_guard(None)
def aggregate(run_data):
"""Group all the data inside a dictionary of lists"""
omnibus = defaultdict(list)
Expand Down Expand Up @@ -228,8 +228,18 @@ def _summarize(group, query=tuple([])) -> Summary:


def make_summary(runs, query=tuple([])) -> dict[str, Summary]:
aggs = [agg for run in runs if (agg := aggregate(run))]
aggs = []
for name, run in runs.items():
try:
if agg := aggregate(run):
aggs.append(agg)
except AssertionError:
syslog("Ignoring run {0}: it looks like it did not finish successfully", name)
except Exception as err:
syslog("Ignoring run {0}: beause of exception: {1}", name, err)

classified = _classify(aggs)
merged = {name: _merge(runs) for name, runs in classified.items()}
summarized = {name: _summarize(agg, query) for name, agg in merged.items()}
return summarized

Loading

0 comments on commit f044a57

Please sign in to comment.