From 314223cdc96da1ecddd62bb8c9801a45674abbe8 Mon Sep 17 00:00:00 2001 From: Thomas Baumann <39156931+brownbaerchen@users.noreply.github.com> Date: Sat, 2 Sep 2023 11:43:52 +0200 Subject: [PATCH] More meta data in statistics (#351) * Custom metadata in statistics * Fixes --- pySDC/core/Hooks.py | 69 ++++---- pySDC/core/Sweeper.py | 4 + pySDC/helpers/stats_helper.py | 24 +-- pySDC/implementations/hooks/default_hook.py | 20 ++- .../hooks/log_embedded_error_estimate.py | 1 + pySDC/implementations/hooks/log_errors.py | 5 + .../hooks/log_extrapolated_error_estimate.py | 1 + pySDC/implementations/hooks/log_restarts.py | 1 + pySDC/implementations/hooks/log_work.py | 1 + .../sweeper_classes/generic_implicit_MPI.py | 12 +- pySDC/projects/Resilience/hook.py | 9 - pySDC/tests/test_hooks/test_entry_class.py | 158 ++++++++++++++++++ 12 files changed, 238 insertions(+), 67 deletions(-) create mode 100644 pySDC/tests/test_hooks/test_entry_class.py diff --git a/pySDC/core/Hooks.py b/pySDC/core/Hooks.py index 0f501fc1e0..ef606f05ef 100644 --- a/pySDC/core/Hooks.py +++ b/pySDC/core/Hooks.py @@ -2,7 +2,18 @@ from collections import namedtuple -Entry = namedtuple('Entry', ['process', 'time', 'level', 'iter', 'sweep', 'type', 'num_restarts']) +# metadata with defaults +meta_data = { + 'process': None, + 'process_sweeper': None, + 'time': None, + 'level': None, + 'iter': None, + 'sweep': None, + 'type': None, + 'num_restarts': None, +} +Entry = namedtuple('Entry', meta_data.keys()) # noinspection PyUnusedLocal,PyShadowingBuiltins,PyShadowingNames @@ -18,9 +29,12 @@ class hooks(object): logger: logger instance for output __num_restarts (int): number of restarts of the current step __stats (dict): dictionary for gathering the statistics of a run - __entry (namedtuple): statistics entry containing all information to identify the value + entry (namedtuple): statistics entry containing all information to identify the value """ + entry = Entry + meta_data = meta_data + def __init__(self): """ Initialization routine @@ -31,52 +45,39 @@ def __init__(self): # create statistics and entry elements self.__stats = {} - self.__entry = Entry - def add_to_stats(self, process, time, level, iter, sweep, type, value): + def add_to_stats(self, value, **kwargs): """ - Routine to add data to the statistics dict + Routine to add data to the statistics dict. Please supply the metadata as keyword arguments in accordance with + the entry class. Args: - process: the current process recording this data - time (float): the current simulation time - level (int): the current level index - iter (int): the current iteration count - sweep (int): the current sweep count - type (str): string to describe the type of value value: the actual data """ # create named tuple for the key and add to dict - self.__stats[ - self.__entry( - process=process, - time=time, - level=level, - iter=iter, - sweep=sweep, - type=type, - num_restarts=self.__num_restarts, - ) - ] = value - - def increment_stats(self, process, time, level, iter, sweep, type, value, initialize=None): + meta = { + **self.meta_data, + **kwargs, + 'num_restarts': self.__num_restarts, + } + self.__stats[self.entry(**meta)] = value + + def increment_stats(self, value, initialize=None, **kwargs): """ Routine to increment data to the statistics dict. If the data is not yet created, it will be initialized to - initialize if applicable and to value otherwise + initialize if applicable and to value otherwise. Please supply metadata as keyword arguments in accordance with + the entry class. Args: - process: the current process recording this data - time (float): the current simulation time - level (int): the current level index - iter (int): the current iteration count - sweep (int): the current sweep count - type (str): string to describe the type of value value: the actual data initialize: if supplied and data does not exist already, this will be used over value """ - key = self.__entry( - process=process, time=time, level=level, iter=iter, sweep=sweep, type=type, num_restarts=self.__num_restarts - ) + meta = { + **meta_data, + **kwargs, + 'num_restarts': self.__num_restarts, + } + key = self.entry(**meta) if key in self.__stats.keys(): self.__stats[key] += value elif initialize is not None: diff --git a/pySDC/core/Sweeper.py b/pySDC/core/Sweeper.py index 6333cf2b6b..5eb54b11b8 100644 --- a/pySDC/core/Sweeper.py +++ b/pySDC/core/Sweeper.py @@ -455,3 +455,7 @@ def level(self, L): """ assert isinstance(L, level) self.__level = L + + @property + def rank(self): + return 0 diff --git a/pySDC/helpers/stats_helper.py b/pySDC/helpers/stats_helper.py index 45e40469cf..5453a92b3e 100644 --- a/pySDC/helpers/stats_helper.py +++ b/pySDC/helpers/stats_helper.py @@ -1,19 +1,12 @@ import numpy as np -def filter_stats( - stats, process=None, time=None, level=None, iter=None, type=None, recomputed=None, num_restarts=None, comm=None -): +def filter_stats(stats, comm=None, recomputed=None, **kwargs): """ - Helper function to extract data from the dictionary of statistics + Helper function to extract data from the dictionary of statistics. Please supply metadata as keyword arguments. Args: stats (dict): raw statistics from a controller run - process (int): process number - time (float): the requested simulation time - level (int): the requested level index - iter (int): the requested iteration count - type (str): string to describe the requested type of value recomputed (bool): filter recomputed values from stats if set to anything other than None comm (mpi4py.MPI.Intracomm): Communicator (or None if not applicable) @@ -21,17 +14,8 @@ def filter_stats( dict: dictionary containing only the entries corresponding to the filter """ result = {} - for k, v in stats.items(): - # get data if key matches the filter (if specified) - if ( - (k.time == time or time is None) - and (k.process == process or process is None) - and (k.level == level or level is None) - and (k.iter == iter or iter is None) - and (k.type == type or type is None) - and (k.num_restarts == num_restarts or num_restarts is None) - ): + if all([k._asdict().get(k2, None) == v2 for k2, v2 in kwargs.items() if v2 is not None] + [True]): result[k] = v if comm is not None: @@ -55,7 +39,7 @@ def filter_stats( ] # delete values that were recorded at times that shouldn't be recorded because we performed a different step after the restart - if type != '_recomputed': + if kwargs.get('type', None) != '_recomputed': other_restarted_steps = [ key for key, val in filter_stats(stats, type='_recomputed', recomputed=False, comm=comm).items() if val ] diff --git a/pySDC/implementations/hooks/default_hook.py b/pySDC/implementations/hooks/default_hook.py index f997b2a6cf..0944539c60 100644 --- a/pySDC/implementations/hooks/default_hook.py +++ b/pySDC/implementations/hooks/default_hook.py @@ -144,6 +144,7 @@ def post_comm(self, step, level_number, add_to_stats=False): self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time, level=L.level_index, iter=step.status.iter, @@ -179,6 +180,7 @@ def post_sweep(self, step, level_number): self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time, level=L.level_index, iter=step.status.iter, @@ -188,6 +190,7 @@ def post_sweep(self, step, level_number): ) self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time, level=L.level_index, iter=step.status.iter, @@ -211,6 +214,7 @@ def post_iteration(self, step, level_number): self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time, level=-1, iter=step.status.iter, @@ -220,6 +224,7 @@ def post_iteration(self, step, level_number): ) self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time, level=L.level_index, iter=step.status.iter, @@ -243,6 +248,7 @@ def post_step(self, step, level_number): self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time, level=L.level_index, iter=step.status.iter, @@ -252,6 +258,7 @@ def post_step(self, step, level_number): ) self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time, level=-1, iter=step.status.iter, @@ -261,6 +268,7 @@ def post_step(self, step, level_number): ) self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time, level=L.level_index, iter=-1, @@ -272,7 +280,14 @@ def post_step(self, step, level_number): # record the recomputed quantities at weird positions to make sure there is only one value for each step for t in [L.time, L.time + L.dt]: self.add_to_stats( - process=-1, time=t, level=-1, iter=-1, sweep=-1, type='_recomputed', value=step.status.get('restart') + process=-1, + time=t, + level=-1, + iter=-1, + sweep=-1, + type='_recomputed', + value=step.status.get('restart'), + process_sweeper=-1, ) def post_predict(self, step, level_number): @@ -290,6 +305,7 @@ def post_predict(self, step, level_number): self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time, level=L.level_index, iter=step.status.iter, @@ -313,6 +329,7 @@ def post_run(self, step, level_number): self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time, level=L.level_index, iter=step.status.iter, @@ -335,6 +352,7 @@ def post_setup(self, step, level_number): self.add_to_stats( process=-1, + process_sweeper=-1, time=-1, level=-1, iter=-1, diff --git a/pySDC/implementations/hooks/log_embedded_error_estimate.py b/pySDC/implementations/hooks/log_embedded_error_estimate.py index d4a01edfa5..1a9c7c5863 100644 --- a/pySDC/implementations/hooks/log_embedded_error_estimate.py +++ b/pySDC/implementations/hooks/log_embedded_error_estimate.py @@ -18,6 +18,7 @@ def log_error(self, step, level_number, appendix=''): value = L.status.error_embedded_estimate self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time + L.dt, level=L.level_index, iter=iter, diff --git a/pySDC/implementations/hooks/log_errors.py b/pySDC/implementations/hooks/log_errors.py index 6edf5d8908..8d19112f28 100644 --- a/pySDC/implementations/hooks/log_errors.py +++ b/pySDC/implementations/hooks/log_errors.py @@ -32,6 +32,7 @@ def log_global_error(self, step, level_number, suffix=''): self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time + L.dt, level=L.level_index, iter=step.status.iter, @@ -41,6 +42,7 @@ def log_global_error(self, step, level_number, suffix=''): ) self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time + L.dt, level=L.level_index, iter=step.status.iter, @@ -69,6 +71,7 @@ def log_local_error(self, step, level_number, suffix=''): self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time + L.dt, level=L.level_index, iter=step.status.iter, @@ -176,6 +179,7 @@ def post_run(self, step, level_number): self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=self.t_last_solution, level=L.level_index, iter=step.status.iter, @@ -185,6 +189,7 @@ def post_run(self, step, level_number): ) self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=self.t_last_solution, level=L.level_index, iter=step.status.iter, diff --git a/pySDC/implementations/hooks/log_extrapolated_error_estimate.py b/pySDC/implementations/hooks/log_extrapolated_error_estimate.py index 1530db9e18..7e2975653c 100644 --- a/pySDC/implementations/hooks/log_extrapolated_error_estimate.py +++ b/pySDC/implementations/hooks/log_extrapolated_error_estimate.py @@ -24,6 +24,7 @@ def post_step(self, step, level_number): self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time + L.dt, level=L.level_index, iter=step.status.iter, diff --git a/pySDC/implementations/hooks/log_restarts.py b/pySDC/implementations/hooks/log_restarts.py index cb251e84dc..6a13e36b84 100644 --- a/pySDC/implementations/hooks/log_restarts.py +++ b/pySDC/implementations/hooks/log_restarts.py @@ -20,6 +20,7 @@ def post_step(self, step, level_number): self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time, level=L.level_index, iter=step.status.iter, diff --git a/pySDC/implementations/hooks/log_work.py b/pySDC/implementations/hooks/log_work.py index af50530bb1..7afac7ce9f 100644 --- a/pySDC/implementations/hooks/log_work.py +++ b/pySDC/implementations/hooks/log_work.py @@ -45,6 +45,7 @@ def post_step(self, step, level_number): for key in self.__work_last_step[step.status.slot][level_number].keys(): self.add_to_stats( process=step.status.slot, + process_sweeper=L.sweep.rank, time=L.time + L.dt, level=L.level_index, iter=step.status.iter, diff --git a/pySDC/implementations/sweeper_classes/generic_implicit_MPI.py b/pySDC/implementations/sweeper_classes/generic_implicit_MPI.py index d3aeb38dd8..a4bfd85fcd 100644 --- a/pySDC/implementations/sweeper_classes/generic_implicit_MPI.py +++ b/pySDC/implementations/sweeper_classes/generic_implicit_MPI.py @@ -30,13 +30,19 @@ def __init__(self, params): self.logger.debug('Using MPI.COMM_WORLD for the communicator because none was supplied in the params.') super().__init__(params) - self.rank = self.params.comm.Get_rank() - if self.params.comm.size != self.coll.num_nodes: raise NotImplementedError( - f'The communicator in the {type(self).__name__} sweeper needs to have one rank for each node as of now! That means we need {self.coll.num_nodes} nodes, but got {self.params.comm.size} nodes.' + f'The communicator in the {type(self).__name__} sweeper needs to have one rank for each node as of now! That means we need {self.coll.num_nodes} nodes, but got {self.params.comm.size} processes.' ) + @property + def comm(self): + return self.params.comm + + @property + def rank(self): + return self.comm.rank + def compute_end_point(self): """ Compute u at the right point of the interval diff --git a/pySDC/projects/Resilience/hook.py b/pySDC/projects/Resilience/hook.py index 129e2cee11..eb654995c3 100644 --- a/pySDC/projects/Resilience/hook.py +++ b/pySDC/projects/Resilience/hook.py @@ -30,15 +30,6 @@ def post_step(self, step, level_number): L = step.levels[level_number] - self.add_to_stats( - process=step.status.slot, - time=L.time, - level=L.level_index, - iter=step.status.iter, - sweep=L.status.sweep, - type='restart', - value=int(step.status.get('restart')), - ) # add the following with two names because I use both in different projects -.- self.increment_stats( process=step.status.slot, diff --git a/pySDC/tests/test_hooks/test_entry_class.py b/pySDC/tests/test_hooks/test_entry_class.py new file mode 100644 index 0000000000..f2f4171e44 --- /dev/null +++ b/pySDC/tests/test_hooks/test_entry_class.py @@ -0,0 +1,158 @@ +import pytest + +from pySDC.core.Hooks import hooks, meta_data, namedtuple + +# In case the starship needs manual override of the reentry sequence, we set a code for unlocking manual controls. +# Because humans may go crazy, or worse, deflect to the enemy when they enter space, we can't tell them the code, or how +# the flight is progressing for that matter. Hence, the weather data on convection in the atmosphere is locked with the +# same code. +convection_meta_data = { + **meta_data, + 'unlock_manual_controls': None, +} +Entry = namedtuple('Entry', convection_meta_data.keys()) + + +class convection_hook(hooks): + meta_data = convection_meta_data + entry = Entry + starship = 'vostok' + + def post_step(self, step, level_number): + """ + Log the amount of convection, but lock it with a special code that we will definitely not tell Yuri... + + Args: + step (pySDC.Step.step): the current step + level_number (int): the current level number + + Returns: + None + """ + L = step.levels[level_number] + self.add_to_stats( + process=step.status.slot, + time=L.time + L.dt, + level=L.level_index, + iter=step.status.iter, + sweep=L.status.sweep, + type='convection', + value=L.uend[0], + unlock_manual_controls=125 if self.starship == 'vostok' else None, + ) + + +def win_space_race(useMPI): + from pySDC.implementations.hooks.log_work import LogWork + from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit + from pySDC.implementations.problem_classes.Lorenz import LorenzAttractor + from pySDC.helpers.stats_helper import get_sorted + + num_steps = 4 + + # initialize level parameters + level_params = {} + level_params['dt'] = 1e-2 + level_params['restol'] = -1 + + # initialize sweeper parameters + sweeper_params = {} + sweeper_params['quad_type'] = 'RADAU-RIGHT' + sweeper_params['num_nodes'] = 1 + sweeper_params['QI'] = 'IE' + + problem_params = {} + + # initialize step parameters + step_params = {} + step_params['maxiter'] = 1 + + # initialize controller parameters + controller_params = {} + controller_params['logger_level'] = 30 + controller_params['hook_class'] = convection_hook + controller_params['mssdc_jac'] = False + + # fill description dictionary for easy step instantiation + description = {} + description['problem_class'] = LorenzAttractor + description['problem_params'] = problem_params + description['sweeper_class'] = generic_implicit + description['sweeper_params'] = sweeper_params + description['level_params'] = level_params + description['step_params'] = step_params + + # set time parameters + t0 = 0.0 + + # instantiate controller + if useMPI: + from mpi4py import MPI + from pySDC.implementations.controller_classes.controller_MPI import controller_MPI + + comm = MPI.COMM_WORLD + + controller = controller_MPI(controller_params=controller_params, description=description, comm=comm) + P = controller.S.levels[0].prob + else: + from pySDC.implementations.controller_classes.controller_nonMPI import controller_nonMPI + + comm = None + controller = controller_nonMPI( + num_procs=num_steps, controller_params=controller_params, description=description + ) + P = controller.MS[0].levels[0].prob + uinit = P.u_exact(t0) + + uend, stats = controller.run(u0=uinit, t0=t0, Tend=num_steps * level_params['dt']) + + from pySDC.helpers.stats_helper import get_list_of_types + + expected = -1 + for code in [None, 0, 125]: + for type in ['residual_post_step', 'convection']: + res = get_sorted(stats, type=type, unlock_manual_controls=code, comm=comm) + if type == 'residual_post_step': + expected = num_steps if code is None else 0 + if type == 'convection': + expected = num_steps if code in [None, 125] else 0 # hmmm... security doesn't seem too good... + + assert ( + len(res) == expected + ), f'Unexpected number of entries in stats for type {type} and code {code}! Got {len(res)}, but expected {expected}.' + + return None + + +@pytest.mark.base +def test_entry_class(): + win_space_race(False) + + +@pytest.mark.mpi4py +def test_entry_class_MPI(): + import os + import subprocess + + num_procs = 4 + + # Set python path once + my_env = os.environ.copy() + my_env['PYTHONPATH'] = '../../..:.' + my_env['COVERAGE_PROCESS_START'] = 'pyproject.toml' + + cmd = f"mpirun -np {num_procs} python {__file__}".split() + + p = subprocess.Popen(cmd, env=my_env, cwd=".") + + p.wait() + assert p.returncode == 0, 'ERROR: did not get return code 0, got %s with %2i processes' % ( + p.returncode, + num_procs, + ) + + +if __name__ == "__main__": + import sys + + win_space_race(True)