From 344e485459567e96b752ac8d79f00a3130d010c3 Mon Sep 17 00:00:00 2001 From: Lourens Veen Date: Sun, 17 Dec 2023 17:50:50 +0100 Subject: [PATCH] Make ProfileDatabase more robust against missing events --- .../libmuscle/manager/profile_database.py | 42 ++++++++++++++----- .../manager/test/test_profile_database.py | 14 ++++--- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/libmuscle/python/libmuscle/manager/profile_database.py b/libmuscle/python/libmuscle/manager/profile_database.py index a1c7a535..af6ef578 100644 --- a/libmuscle/python/libmuscle/manager/profile_database.py +++ b/libmuscle/python/libmuscle/manager/profile_database.py @@ -4,6 +4,7 @@ import threading from types import TracebackType from typing import Any, cast, Dict, List, Optional, Tuple, Type, Union +from warnings import warn class ProfileDatabase: @@ -94,24 +95,42 @@ def instance_stats( """ cur = self._get_cursor() cur.execute("BEGIN TRANSACTION") + cur.execute("SELECT name FROM instances") + instances = [row[0] for row in cur.fetchall()] + cur.execute( "SELECT instance, stop_time" " FROM all_events" " WHERE type = 'CONNECT'") start_run = dict(cur.fetchall()) + for name in instances: + if name not in start_run: + warn( + f'Instance {name} seems to have never registered with the' + ' manager, and will be omitted from the results. Did it crash' + ' on startup?') + cur.execute( "SELECT instance, start_time" " FROM all_events" " WHERE type = 'SHUTDOWN_WAIT'") stop_run = dict(cur.fetchall()) - if not stop_run: - cur.execute( - "SELECT instance, start_time" - " FROM all_events" - " WHERE type = 'DEREGISTER'") - stop_run = dict(cur.fetchall()) + for name in instances: + if name not in stop_run: + warn( + f'Instance {name} did not shut down cleanly, data may be' + ' inaccurate or missing') + + cur.execute( + "SELECT stop_time" + " FROM all_events" + " WHERE instance = ?" + " ORDER BY stop_time DESC LIMIT 1", [name]) + result = cur.fetchall() + if result: + stop_run[name] = result[0][0] cur.execute( "SELECT instance, SUM(stop_time - start_time)" @@ -130,20 +149,21 @@ def instance_stats( cur.execute("COMMIT") cur.close() - instances = list(start_run.keys()) - total_times = [(stop_run[i] - start_run[i]) * 1e-9 for i in instances] + complete_instances = list(set(start_run) & set(stop_run)) + + total_times = [(stop_run[i] - start_run[i]) * 1e-9 for i in complete_instances] comm_times = [ ( (comm[i] if i in comm else 0) - (wait[i] if i in wait else 0) ) * 1e-9 - for i in instances] - wait_times = [(wait[i] if i in wait else 0) * 1e-9 for i in instances] + for i in complete_instances] + wait_times = [(wait[i] if i in wait else 0) * 1e-9 for i in complete_instances] run_times = [ t - c - w for t, c, w in zip(total_times, comm_times, wait_times)] - return instances, run_times, comm_times, wait_times + return complete_instances, run_times, comm_times, wait_times def resource_stats(self) -> Dict[str, Dict[str, float]]: """Calculate per-core statistics. diff --git a/libmuscle/python/libmuscle/manager/test/test_profile_database.py b/libmuscle/python/libmuscle/manager/test/test_profile_database.py index b5e12896..2d6d472c 100644 --- a/libmuscle/python/libmuscle/manager/test/test_profile_database.py +++ b/libmuscle/python/libmuscle/manager/test/test_profile_database.py @@ -48,7 +48,8 @@ def t(offset: int) -> ProfileTimestamp: ProfileEvent( ProfileEventType.SEND, t(2600), t(2900), Port('out', Operator.O_I), None, None, 1000000, 0.0), - ProfileEvent(ProfileEventType.DEREGISTER, t(10000), t(11000))] + ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, t(10000), t(11000)), + ProfileEvent(ProfileEventType.DEREGISTER, t(11000), t(11100))] store.add_events(Reference('instance1'), e1) @@ -73,7 +74,8 @@ def t(offset: int) -> ProfileTimestamp: ProfileEvent( ProfileEventType.RECEIVE_WAIT, t(2600), t(2870), Port('in', Operator.O_I), None, None, 1000000, 0.0), - ProfileEvent(ProfileEventType.DEREGISTER, t(10000), t(11000))] + ProfileEvent(ProfileEventType.SHUTDOWN_WAIT, t(10000), t(11000)), + ProfileEvent(ProfileEventType.DEREGISTER, t(11000), t(11100))] store.add_events(Reference('instance2'), e2) @@ -126,12 +128,12 @@ def test_resource_stats(db_file): def test_time_taken(db_file): with ProfileDatabase(db_file) as db: assert 1000.0 == db.time_taken(etype='REGISTER', instance='instance1') - assert 1000.0 == db.time_taken(etype='DEREGISTER') - assert 11000.0 == db.time_taken( + assert 100.0 == db.time_taken(etype='DEREGISTER') + assert 11100.0 == db.time_taken( etype='REGISTER', instance='instance1', etype2='DEREGISTER') - assert 9000.0 == db.time_taken( + assert 10000.0 == db.time_taken( etype='REGISTER', instance='instance1', time='stop', etype2='DEREGISTER', time2='start') assert 200.0 == db.time_taken(etype='SEND') - assert 2000.0 == db.time_taken(etype='DEREGISTER', aggregate='sum') + assert 200.0 == db.time_taken(etype='DEREGISTER', aggregate='sum') assert 600.0 == db.time_taken(etype='SEND', aggregate='sum')