Skip to content

Commit

Permalink
Make ProfileDatabase more robust against missing events
Browse files Browse the repository at this point in the history
  • Loading branch information
LourensVeen committed Dec 17, 2023
1 parent e5df588 commit 344e485
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 17 deletions.
42 changes: 31 additions & 11 deletions libmuscle/python/libmuscle/manager/profile_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import threading
from types import TracebackType
from typing import Any, cast, Dict, List, Optional, Tuple, Type, Union
from warnings import warn


class ProfileDatabase:
Expand Down Expand Up @@ -94,24 +95,42 @@ def instance_stats(
"""
cur = self._get_cursor()
cur.execute("BEGIN TRANSACTION")
cur.execute("SELECT name FROM instances")
instances = [row[0] for row in cur.fetchall()]

cur.execute(
"SELECT instance, stop_time"
" FROM all_events"
" WHERE type = 'CONNECT'")
start_run = dict(cur.fetchall())

for name in instances:
if name not in start_run:
warn(
f'Instance {name} seems to have never registered with the'
' manager, and will be omitted from the results. Did it crash'
' on startup?')

cur.execute(
"SELECT instance, start_time"
" FROM all_events"
" WHERE type = 'SHUTDOWN_WAIT'")
stop_run = dict(cur.fetchall())

if not stop_run:
cur.execute(
"SELECT instance, start_time"
" FROM all_events"
" WHERE type = 'DEREGISTER'")
stop_run = dict(cur.fetchall())
for name in instances:
if name not in stop_run:
warn(
f'Instance {name} did not shut down cleanly, data may be'
' inaccurate or missing')

cur.execute(
"SELECT stop_time"
" FROM all_events"
" WHERE instance = ?"
" ORDER BY stop_time DESC LIMIT 1", [name])
result = cur.fetchall()
if result:
stop_run[name] = result[0][0]

cur.execute(
"SELECT instance, SUM(stop_time - start_time)"
Expand All @@ -130,20 +149,21 @@ def instance_stats(
cur.execute("COMMIT")
cur.close()

instances = list(start_run.keys())
total_times = [(stop_run[i] - start_run[i]) * 1e-9 for i in instances]
complete_instances = list(set(start_run) & set(stop_run))

total_times = [(stop_run[i] - start_run[i]) * 1e-9 for i in complete_instances]
comm_times = [
(
(comm[i] if i in comm else 0) -
(wait[i] if i in wait else 0)
) * 1e-9
for i in instances]
wait_times = [(wait[i] if i in wait else 0) * 1e-9 for i in instances]
for i in complete_instances]
wait_times = [(wait[i] if i in wait else 0) * 1e-9 for i in complete_instances]
run_times = [
t - c - w
for t, c, w in zip(total_times, comm_times, wait_times)]

return instances, run_times, comm_times, wait_times
return complete_instances, run_times, comm_times, wait_times

def resource_stats(self) -> Dict[str, Dict[str, float]]:
"""Calculate per-core statistics.
Expand Down
14 changes: 8 additions & 6 deletions libmuscle/python/libmuscle/manager/test/test_profile_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def t(offset: int) -> ProfileTimestamp:
ProfileEvent(
ProfileEventType.SEND, t(2600), t(2900),
Port('out', Operator.O_I), None, None, 1000000, 0.0),
ProfileEvent(ProfileEventType.DEREGISTER, t(10000), t(11000))]
ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, t(10000), t(11000)),
ProfileEvent(ProfileEventType.DEREGISTER, t(11000), t(11100))]

store.add_events(Reference('instance1'), e1)

Expand All @@ -73,7 +74,8 @@ def t(offset: int) -> ProfileTimestamp:
ProfileEvent(
ProfileEventType.RECEIVE_WAIT, t(2600), t(2870),
Port('in', Operator.O_I), None, None, 1000000, 0.0),
ProfileEvent(ProfileEventType.DEREGISTER, t(10000), t(11000))]
ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, t(10000), t(11000)),
ProfileEvent(ProfileEventType.DEREGISTER, t(11000), t(11100))]

store.add_events(Reference('instance2'), e2)

Expand Down Expand Up @@ -126,12 +128,12 @@ def test_resource_stats(db_file):
def test_time_taken(db_file):
with ProfileDatabase(db_file) as db:
assert 1000.0 == db.time_taken(etype='REGISTER', instance='instance1')
assert 1000.0 == db.time_taken(etype='DEREGISTER')
assert 11000.0 == db.time_taken(
assert 100.0 == db.time_taken(etype='DEREGISTER')
assert 11100.0 == db.time_taken(
etype='REGISTER', instance='instance1', etype2='DEREGISTER')
assert 9000.0 == db.time_taken(
assert 10000.0 == db.time_taken(
etype='REGISTER', instance='instance1', time='stop',
etype2='DEREGISTER', time2='start')
assert 200.0 == db.time_taken(etype='SEND')
assert 2000.0 == db.time_taken(etype='DEREGISTER', aggregate='sum')
assert 200.0 == db.time_taken(etype='DEREGISTER', aggregate='sum')
assert 600.0 == db.time_taken(etype='SEND', aggregate='sum')

0 comments on commit 344e485

Please sign in to comment.