From a46d63178507ee6d090ff4a8f2e73feefe73edaa Mon Sep 17 00:00:00 2001 From: Thomas Baumann <39156931+brownbaerchen@users.noreply.github.com> Date: Fri, 25 Aug 2023 11:30:56 +0200 Subject: [PATCH] MPI parallel implementation of extrapolation within Q error estimate (#349) * Minor refinements to problem classes and better efficient sweepers * Added ESDIRK strategy * Resilience in GSSDC * Forgot some files * Added multiple inheritance to MPI sweeper --- .../estimate_extrapolation_error.py | 28 ++- .../sweeper_classes/generic_implicit_MPI.py | 193 ++++++++++-------- .../Resilience/extrapolation_within_Q.py | 16 +- .../test_extrapolation_within_Q.py | 191 +++++++++++++++++ 4 files changed, 330 insertions(+), 98 deletions(-) create mode 100644 pySDC/tests/test_convergence_controllers/test_extrapolation_within_Q.py diff --git a/pySDC/implementations/convergence_controller_classes/estimate_extrapolation_error.py b/pySDC/implementations/convergence_controller_classes/estimate_extrapolation_error.py index 236172fc2c..c7bba5fb5e 100644 --- a/pySDC/implementations/convergence_controller_classes/estimate_extrapolation_error.py +++ b/pySDC/implementations/convergence_controller_classes/estimate_extrapolation_error.py @@ -448,6 +448,11 @@ def setup(self, controller, params, description, **kwargs): from pySDC.implementations.convergence_controller_classes.check_convergence import CheckConvergence num_nodes = description['sweeper_params']['num_nodes'] + self.comm = description['sweeper_params'].get('comm', None) + if self.comm: + from mpi4py import MPI + + self.sum = MPI.SUM self.check_convergence = CheckConvergence.check_convergence @@ -498,10 +503,25 @@ def post_iteration_processing(self, controller, S, **kwargs): works with types imex_mesh and mesh" ) - u_ex = lvl.prob.dtype_u(lvl.prob.init, val=0.0) - for i in range(self.params.n): - u_ex += self.coeff.u[i] * lvl.u[i] + self.coeff.f[i] * f[i] + # compute the error with the weighted sum + if self.comm: + idx = (self.comm.rank + 1) % self.comm.size + sendbuf = self.coeff.u[idx] * lvl.u[idx] + self.coeff.f[idx] * lvl.f[idx] + u_ex = lvl.prob.dtype_u(lvl.prob.init, val=0.0) if self.comm.rank == self.comm.size - 1 else None + self.comm.Reduce(sendbuf, u_ex, op=self.sum, root=self.comm.size - 1) + else: + u_ex = lvl.prob.dtype_u(lvl.prob.init, val=0.0) + for i in range(self.params.n): + u_ex += self.coeff.u[i] * lvl.u[i] + self.coeff.f[i] * f[i] # store the error - lvl.status.error_extrapolation_estimate = abs(u_ex - lvl.u[-1]) * self.coeff.prefactor + if self.comm: + error = ( + abs(u_ex - lvl.u[self.comm.rank + 1]) * self.coeff.prefactor + if self.comm.rank == self.comm.size - 1 + else None + ) + lvl.status.error_extrapolation_estimate = self.comm.bcast(error, root=self.comm.size - 1) + else: + lvl.status.error_extrapolation_estimate = abs(u_ex - lvl.u[-1]) * self.coeff.prefactor return None diff --git a/pySDC/implementations/sweeper_classes/generic_implicit_MPI.py b/pySDC/implementations/sweeper_classes/generic_implicit_MPI.py index e60e75731f..4a661cc9e2 100644 --- a/pySDC/implementations/sweeper_classes/generic_implicit_MPI.py +++ b/pySDC/implementations/sweeper_classes/generic_implicit_MPI.py @@ -1,107 +1,41 @@ from mpi4py import MPI +from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit from pySDC.core.Sweeper import sweeper +import logging -class generic_implicit_MPI(sweeper): +class SweeperMPI(sweeper): """ - Generic implicit sweeper, expecting lower triangular matrix type as input - - Attributes: - QI: lower triangular matrix + MPI based sweeper where each rank administers one collocation node. Adapt sweepers to MPI by use of multiple inheritance. + See for example the `generic_implicit_MPI` sweeper, which has a class definition: + + ``` + class generic_implicit_MPI(SweeperMPI, generic_implicit): + ``` + + this means in inherits both from `SweeperMPI` and `generic_implicit`. The hierarchy works such that functions are first + called from `SweeperMPI` and then from `generic_implicit`. For instance, in the `__init__` function, the `SweeperMPI` + class adds a communicator and nothing else. The `generic_implicit` implicit class adds a preconditioner and so on. + It's a bit confusing because `self.params` is overwritten in the second call to the `__init__` of the core `sweeper` + class, but the `SweeperMPI` class adds parameters to the `params` dictionary, which will again be added in + `generic_implicit`. """ def __init__(self, params): - """ - Initialization routine for the custom sweeper + self.logger = logging.getLogger('sweeper') - Args: - params: parameters for the sweeper - """ - - if 'QI' not in params: - params['QI'] = 'IE' - - # call parent's initialization routine + if 'comm' not in params.keys(): + params['comm'] = MPI.COMM_WORLD + self.logger.debug('Using MPI.COMM_WORLD for the communicator because none was supplied in the params.') super().__init__(params) - # get QI matrix - self.QI = self.get_Qdelta_implicit(self.coll, qd_type=self.params.QI) - self.rank = self.params.comm.Get_rank() - def integrate(self): - """ - Integrates the right-hand side - - Returns: - list of dtype_u: containing the integral as values - """ - - # get current level and problem description - L = self.level - P = L.prob - - me = P.dtype_u(P.init, val=0.0) - for m in range(self.coll.num_nodes): - if m == self.rank: - self.params.comm.Reduce( - L.dt * self.coll.Qmat[m + 1, self.rank + 1] * L.f[self.rank + 1], me, root=m, op=MPI.SUM - ) - else: - self.params.comm.Reduce( - L.dt * self.coll.Qmat[m + 1, self.rank + 1] * L.f[self.rank + 1], None, root=m, op=MPI.SUM - ) - - return me - - def update_nodes(self): - """ - Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes - - Returns: - None - """ - - # get current level and problem description - L = self.level - P = L.prob - - # only if the level has been touched before - assert L.status.unlocked - - # get number of collocation nodes for easier access - - # gather all terms which are known already (e.g. from the previous iteration) - # this corresponds to u0 + QF(u^k) - QdF(u^k) + tau - - # get QF(u^k) - rhs = self.integrate() - - rhs -= L.dt * self.QI[self.rank + 1, self.rank + 1] * L.f[self.rank + 1] - - # add initial value - rhs += L.u[0] - # add tau if associated - if L.tau[self.rank] is not None: - rhs += L.tau[self.rank] - - # build rhs, consisting of the known values from above and new values from previous nodes (at k+1) - - # implicit solve with prefactor stemming from the diagonal of Qd - L.u[self.rank + 1] = P.solve_system( - rhs, - L.dt * self.QI[self.rank + 1, self.rank + 1], - L.u[self.rank + 1], - L.time + L.dt * self.coll.nodes[self.rank], - ) - # update function values - L.f[self.rank + 1] = P.eval_f(L.u[self.rank + 1], L.time + L.dt * self.coll.nodes[self.rank]) - - # indicate presence of new values at this level - L.status.updated = True - - return None + if self.params.comm.size != self.coll.num_nodes: + raise NotImplementedError( + f'The communicator in the {type(self).__name__} sweeper needs to have one rank for each node as of now! That means we need {self.coll.num_nodes} nodes, but got {self.params.comm.size} nodes.' + ) def compute_end_point(self): """ @@ -191,3 +125,82 @@ def predict(self): # indicate that this level is now ready for sweeps L.status.unlocked = True L.status.updated = True + + +class generic_implicit_MPI(SweeperMPI, generic_implicit): + """ + Generic implicit sweeper parallelized across the nodes. + Please supply a communicator as `comm` to the parameters! + + Attributes: + rank (int): MPI rank + """ + + def integrate(self): + """ + Integrates the right-hand side + + Returns: + list of dtype_u: containing the integral as values + """ + + # get current level and problem description + L = self.level + P = L.prob + + me = P.dtype_u(P.init, val=0.0) + for m in range(self.coll.num_nodes): + recvBuf = me if m == self.rank else None + self.params.comm.Reduce( + L.dt * self.coll.Qmat[m + 1, self.rank + 1] * L.f[self.rank + 1], recvBuf, root=m, op=MPI.SUM + ) + + return me + + def update_nodes(self): + """ + Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes + + Returns: + None + """ + + # get current level and problem description + L = self.level + P = L.prob + + # only if the level has been touched before + assert L.status.unlocked + + # get number of collocation nodes for easier access + + # gather all terms which are known already (e.g. from the previous iteration) + # this corresponds to u0 + QF(u^k) - QdF(u^k) + tau + + # get QF(u^k) + rhs = self.integrate() + + rhs -= L.dt * self.QI[self.rank + 1, self.rank + 1] * L.f[self.rank + 1] + + # add initial value + rhs += L.u[0] + # add tau if associated + if L.tau[self.rank] is not None: + rhs += L.tau[self.rank] + + # build rhs, consisting of the known values from above and new values from previous nodes (at k+1) + + # implicit solve with prefactor stemming from the diagonal of Qd + L.u[self.rank + 1] = P.solve_system( + rhs, + L.dt * self.QI[self.rank + 1, self.rank + 1], + L.u[self.rank + 1], + L.time + L.dt * self.coll.nodes[self.rank], + ) + # update function values + L.f[self.rank + 1] = P.eval_f(L.u[self.rank + 1], L.time + L.dt * self.coll.nodes[self.rank]) + + # indicate presence of new values at this level + L.status.updated = True + + return None diff --git a/pySDC/projects/Resilience/extrapolation_within_Q.py b/pySDC/projects/Resilience/extrapolation_within_Q.py index 506d31e69a..31b45b0eac 100644 --- a/pySDC/projects/Resilience/extrapolation_within_Q.py +++ b/pySDC/projects/Resilience/extrapolation_within_Q.py @@ -12,7 +12,7 @@ from pySDC.projects.Resilience.vdp import run_vdp -def multiple_runs(prob, dts, num_nodes, quad_type='RADAU-RIGHT'): +def multiple_runs(prob, dts, num_nodes, quad_type='RADAU-RIGHT', QI='LU', useMPI=False): """ Make multiple runs of a specific problem and record vital error information @@ -32,6 +32,14 @@ def multiple_runs(prob, dts, num_nodes, quad_type='RADAU-RIGHT'): description['sweeper_params'] = {'num_nodes': num_nodes, 'quad_type': quad_type} description['convergence_controllers'] = {EstimateExtrapolationErrorWithinQ: {}} + if useMPI: + from pySDC.implementations.sweeper_classes.generic_implicit_MPI import generic_implicit_MPI, MPI + + description['sweeper_class'] = generic_implicit_MPI + description['sweeper_params']['comm'] = MPI.COMM_WORLD.Split(MPI.COMM_WORLD.rank < num_nodes) + if MPI.COMM_WORLD.rank > num_nodes: + return None + if prob.__name__ == 'run_advection': description['problem_params'] = {'order': 6, 'stencil_type': 'center'} @@ -90,7 +98,7 @@ def plot_and_compute_order(ax, res, num_nodes, coll_order): ax.legend(frameon=False) -def check_order(ax, prob, dts, num_nodes, quad_type): +def check_order(ax, prob, dts, num_nodes, quad_type, **kwargs): """ Check the order by calling `multiple_runs` and then `plot_and_compute_order`. @@ -101,7 +109,7 @@ def check_order(ax, prob, dts, num_nodes, quad_type): num_nodes (int): Number of nodes quad_type (str): Type of nodes """ - res, coll_order = multiple_runs(prob, dts, num_nodes, quad_type) + res, coll_order = multiple_runs(prob, dts, num_nodes, quad_type, **kwargs) plot_and_compute_order(ax, res, num_nodes, coll_order) @@ -109,7 +117,7 @@ def main(): fig, ax = plt.subplots() num_nodes = 3 quad_type = 'RADAU-RIGHT' - check_order(ax, run_advection, [5e-1, 1e-1, 5e-2, 1e-2], num_nodes, quad_type) + check_order(ax, run_advection, [5e-1, 1e-1, 5e-2, 1e-2], num_nodes, quad_type, QI='MIN', useMPI=True) plt.show() diff --git a/pySDC/tests/test_convergence_controllers/test_extrapolation_within_Q.py b/pySDC/tests/test_convergence_controllers/test_extrapolation_within_Q.py new file mode 100644 index 0000000000..73a3297d3d --- /dev/null +++ b/pySDC/tests/test_convergence_controllers/test_extrapolation_within_Q.py @@ -0,0 +1,191 @@ +import pytest + + +def single_run(dt, Tend, num_nodes, quad_type, QI, useMPI): + """ + Runs a single advection problem with certain parameters + + Args: + dt (float): Step size + Tend (float): Final time + num_nodes (int): Number of nodes + quad_type (str): Type of quadrature + QI (str): Preconditioner + useMPI (bool): Whether or not to use MPI + + Returns: + (dict): Stats object generated during the run + (pySDC.Controller.controller): Controller used in the run + """ + from pySDC.implementations.problem_classes.AdvectionEquation_ND_FD import advectionNd + from pySDC.implementations.controller_classes.controller_nonMPI import controller_nonMPI + from pySDC.implementations.hooks.log_errors import LogGlobalErrorPostStep + from pySDC.implementations.convergence_controller_classes.estimate_extrapolation_error import ( + EstimateExtrapolationErrorWithinQ, + ) + + if useMPI: + from pySDC.implementations.sweeper_classes.generic_implicit_MPI import generic_implicit_MPI as sweeper_class + from mpi4py import MPI + + comm = MPI.COMM_WORLD + else: + from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit as sweeper_class + + comm = None + + # initialize level parameters + level_params = {} + level_params['dt'] = dt + level_params['restol'] = 1e-10 + + # initialize sweeper parameters + sweeper_params = {} + sweeper_params['quad_type'] = quad_type + sweeper_params['num_nodes'] = num_nodes + sweeper_params['QI'] = QI + sweeper_params['comm'] = comm + + problem_params = {'freq': 2, 'nvars': 2**9, 'c': 1.0, 'stencil_type': 'center', 'order': 6, 'bc': 'periodic'} + + # initialize step parameters + step_params = {} + step_params['maxiter'] = 99 + + # initialize controller parameters + controller_params = {} + controller_params['logger_level'] = 30 + controller_params['hook_class'] = LogGlobalErrorPostStep + controller_params['mssdc_jac'] = False + + # fill description dictionary for easy step instantiation + description = {} + description['problem_class'] = advectionNd + description['problem_params'] = problem_params + description['sweeper_class'] = sweeper_class + description['sweeper_params'] = sweeper_params + description['level_params'] = level_params + description['step_params'] = step_params + description['convergence_controllers'] = {EstimateExtrapolationErrorWithinQ: {}} + + # set time parameters + t0 = 0.0 + + # instantiate controller + controller = controller_nonMPI(num_procs=1, controller_params=controller_params, description=description) + + # get initial values on finest level + P = controller.MS[0].levels[0].prob + uinit = P.u_exact(t0) + + # call main function to get things done... + uend, stats = controller.run(u0=uinit, t0=t0, Tend=Tend) + return stats, controller + + +def multiple_runs(dts, **kwargs): + """ + Make multiple runs of a specific problem and record vital error information + + Args: + dts (list): The step sizes to run with + num_nodes (int): Number of nodes + quad_type (str): Type of nodes + + Returns: + dict: Errors for multiple runs + int: Order of the collocation problem + """ + from pySDC.helpers.stats_helper import get_sorted + + res = {} + + for dt in dts: + stats, controller = single_run(Tend=5.0 * dt, dt=dt, **kwargs) + + res[dt] = {} + res[dt]['e_loc'] = max([me[1] for me in get_sorted(stats, type='e_global_post_step')]) + res[dt]['e_ex'] = max([me[1] for me in get_sorted(stats, type='error_extrapolation_estimate')]) + + coll_order = controller.MS[0].levels[0].sweep.coll.order + return res, coll_order + + +def check_order(dts, **kwargs): + """ + Check the order by calling `multiple_runs` and then `plot_and_compute_order`. + + Args: + dts (list): The step sizes to run with + num_nodes (int): Number of nodes + quad_type (str): Type of nodes + """ + import numpy as np + + res, coll_order = multiple_runs(dts, **kwargs) + dts = np.array(list(res.keys())) + keys = list(res[dts[0]].keys()) + + expected_order = { + 'e_loc': coll_order + 1, + 'e_ex': kwargs['num_nodes'] + 1, + } + + for key in keys: + errors = np.array([res[dt][key] for dt in dts]) + + mask = np.logical_and(errors < 1e-0, errors > 1e-10) + order = np.log(errors[mask][1:] / errors[mask][:-1]) / np.log(dts[mask][1:] / dts[mask][:-1]) + + assert np.isclose( + np.mean(order), expected_order[key], atol=0.5 + ), f'Expected order {expected_order[key]} for {key}, but got {np.mean(order):.2e}!' + + +@pytest.mark.base +@pytest.mark.parametrize('num_nodes', [2, 3]) +@pytest.mark.parametrize('quad_type', ['RADAU-RIGHT', 'GAUSS']) +def test_extrapolation_within_Q(num_nodes, quad_type): + kwargs = { + 'num_nodes': num_nodes, + 'quad_type': quad_type, + 'useMPI': False, + 'QI': 'MIN', + } + check_order([5e-1, 1e-1, 8e-2, 5e-2], **kwargs) + + +@pytest.mark.mpi4py +@pytest.mark.parametrize('num_nodes', [2, 3]) +@pytest.mark.parametrize('quad_type', ['RADAU-RIGHT']) +def test_extrapolation_within_Q_MPI(num_nodes, quad_type): + import subprocess + import os + + # Set python path once + my_env = os.environ.copy() + my_env['PYTHONPATH'] = '../../..:.' + my_env['COVERAGE_PROCESS_START'] = 'pyproject.toml' + + cmd = f"mpirun -np {num_nodes} python {__file__} {num_nodes} {quad_type}".split() + + p = subprocess.Popen(cmd, env=my_env, cwd=".") + + p.wait() + assert p.returncode == 0, 'ERROR: did not get return code 0, got %s with %2i processes' % ( + p.returncode, + num_nodes, + ) + + +if __name__ == "__main__": + import sys + + if len(sys.argv) > 1: + kwargs = { + 'num_nodes': int(sys.argv[1]), + 'quad_type': sys.argv[2], + 'useMPI': True, + 'QI': 'MIN', + } + check_order([5e-1, 1e-1, 8e-2, 5e-2], **kwargs)