Skip to content

Commit

Permalink
MPI parallel implementation of extrapolation within Q error estimate (#…
Browse files Browse the repository at this point in the history
…349)

* Minor refinements to problem classes and better efficient sweepers

* Added ESDIRK strategy

* Resilience in GSSDC

* Forgot some files

* Added multiple inheritance to MPI sweeper
  • Loading branch information
brownbaerchen authored Aug 25, 2023
1 parent a44c178 commit a46d631
Show file tree
Hide file tree
Showing 4 changed files with 330 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@ def setup(self, controller, params, description, **kwargs):
from pySDC.implementations.convergence_controller_classes.check_convergence import CheckConvergence

num_nodes = description['sweeper_params']['num_nodes']
self.comm = description['sweeper_params'].get('comm', None)
if self.comm:
from mpi4py import MPI

self.sum = MPI.SUM

self.check_convergence = CheckConvergence.check_convergence

Expand Down Expand Up @@ -498,10 +503,25 @@ def post_iteration_processing(self, controller, S, **kwargs):
works with types imex_mesh and mesh"
)

u_ex = lvl.prob.dtype_u(lvl.prob.init, val=0.0)
for i in range(self.params.n):
u_ex += self.coeff.u[i] * lvl.u[i] + self.coeff.f[i] * f[i]
# compute the error with the weighted sum
if self.comm:
idx = (self.comm.rank + 1) % self.comm.size
sendbuf = self.coeff.u[idx] * lvl.u[idx] + self.coeff.f[idx] * lvl.f[idx]
u_ex = lvl.prob.dtype_u(lvl.prob.init, val=0.0) if self.comm.rank == self.comm.size - 1 else None
self.comm.Reduce(sendbuf, u_ex, op=self.sum, root=self.comm.size - 1)
else:
u_ex = lvl.prob.dtype_u(lvl.prob.init, val=0.0)
for i in range(self.params.n):
u_ex += self.coeff.u[i] * lvl.u[i] + self.coeff.f[i] * f[i]

# store the error
lvl.status.error_extrapolation_estimate = abs(u_ex - lvl.u[-1]) * self.coeff.prefactor
if self.comm:
error = (
abs(u_ex - lvl.u[self.comm.rank + 1]) * self.coeff.prefactor
if self.comm.rank == self.comm.size - 1
else None
)
lvl.status.error_extrapolation_estimate = self.comm.bcast(error, root=self.comm.size - 1)
else:
lvl.status.error_extrapolation_estimate = abs(u_ex - lvl.u[-1]) * self.coeff.prefactor
return None
193 changes: 103 additions & 90 deletions pySDC/implementations/sweeper_classes/generic_implicit_MPI.py
Original file line number Diff line number Diff line change
@@ -1,107 +1,41 @@
from mpi4py import MPI

from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit
from pySDC.core.Sweeper import sweeper
import logging


class generic_implicit_MPI(sweeper):
class SweeperMPI(sweeper):
"""
Generic implicit sweeper, expecting lower triangular matrix type as input
Attributes:
QI: lower triangular matrix
MPI based sweeper where each rank administers one collocation node. Adapt sweepers to MPI by use of multiple inheritance.
See for example the `generic_implicit_MPI` sweeper, which has a class definition:
```
class generic_implicit_MPI(SweeperMPI, generic_implicit):
```
this means in inherits both from `SweeperMPI` and `generic_implicit`. The hierarchy works such that functions are first
called from `SweeperMPI` and then from `generic_implicit`. For instance, in the `__init__` function, the `SweeperMPI`
class adds a communicator and nothing else. The `generic_implicit` implicit class adds a preconditioner and so on.
It's a bit confusing because `self.params` is overwritten in the second call to the `__init__` of the core `sweeper`
class, but the `SweeperMPI` class adds parameters to the `params` dictionary, which will again be added in
`generic_implicit`.
"""

def __init__(self, params):
"""
Initialization routine for the custom sweeper
self.logger = logging.getLogger('sweeper')

Args:
params: parameters for the sweeper
"""

if 'QI' not in params:
params['QI'] = 'IE'

# call parent's initialization routine
if 'comm' not in params.keys():
params['comm'] = MPI.COMM_WORLD
self.logger.debug('Using MPI.COMM_WORLD for the communicator because none was supplied in the params.')
super().__init__(params)

# get QI matrix
self.QI = self.get_Qdelta_implicit(self.coll, qd_type=self.params.QI)

self.rank = self.params.comm.Get_rank()

def integrate(self):
"""
Integrates the right-hand side
Returns:
list of dtype_u: containing the integral as values
"""

# get current level and problem description
L = self.level
P = L.prob

me = P.dtype_u(P.init, val=0.0)
for m in range(self.coll.num_nodes):
if m == self.rank:
self.params.comm.Reduce(
L.dt * self.coll.Qmat[m + 1, self.rank + 1] * L.f[self.rank + 1], me, root=m, op=MPI.SUM
)
else:
self.params.comm.Reduce(
L.dt * self.coll.Qmat[m + 1, self.rank + 1] * L.f[self.rank + 1], None, root=m, op=MPI.SUM
)

return me

def update_nodes(self):
"""
Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes
Returns:
None
"""

# get current level and problem description
L = self.level
P = L.prob

# only if the level has been touched before
assert L.status.unlocked

# get number of collocation nodes for easier access

# gather all terms which are known already (e.g. from the previous iteration)
# this corresponds to u0 + QF(u^k) - QdF(u^k) + tau

# get QF(u^k)
rhs = self.integrate()

rhs -= L.dt * self.QI[self.rank + 1, self.rank + 1] * L.f[self.rank + 1]

# add initial value
rhs += L.u[0]
# add tau if associated
if L.tau[self.rank] is not None:
rhs += L.tau[self.rank]

# build rhs, consisting of the known values from above and new values from previous nodes (at k+1)

# implicit solve with prefactor stemming from the diagonal of Qd
L.u[self.rank + 1] = P.solve_system(
rhs,
L.dt * self.QI[self.rank + 1, self.rank + 1],
L.u[self.rank + 1],
L.time + L.dt * self.coll.nodes[self.rank],
)
# update function values
L.f[self.rank + 1] = P.eval_f(L.u[self.rank + 1], L.time + L.dt * self.coll.nodes[self.rank])

# indicate presence of new values at this level
L.status.updated = True

return None
if self.params.comm.size != self.coll.num_nodes:
raise NotImplementedError(
f'The communicator in the {type(self).__name__} sweeper needs to have one rank for each node as of now! That means we need {self.coll.num_nodes} nodes, but got {self.params.comm.size} nodes.'
)

def compute_end_point(self):
"""
Expand Down Expand Up @@ -191,3 +125,82 @@ def predict(self):
# indicate that this level is now ready for sweeps
L.status.unlocked = True
L.status.updated = True


class generic_implicit_MPI(SweeperMPI, generic_implicit):
"""
Generic implicit sweeper parallelized across the nodes.
Please supply a communicator as `comm` to the parameters!
Attributes:
rank (int): MPI rank
"""

def integrate(self):
"""
Integrates the right-hand side
Returns:
list of dtype_u: containing the integral as values
"""

# get current level and problem description
L = self.level
P = L.prob

me = P.dtype_u(P.init, val=0.0)
for m in range(self.coll.num_nodes):
recvBuf = me if m == self.rank else None
self.params.comm.Reduce(
L.dt * self.coll.Qmat[m + 1, self.rank + 1] * L.f[self.rank + 1], recvBuf, root=m, op=MPI.SUM
)

return me

def update_nodes(self):
"""
Update the u- and f-values at the collocation nodes -> corresponds to a single sweep over all nodes
Returns:
None
"""

# get current level and problem description
L = self.level
P = L.prob

# only if the level has been touched before
assert L.status.unlocked

# get number of collocation nodes for easier access

# gather all terms which are known already (e.g. from the previous iteration)
# this corresponds to u0 + QF(u^k) - QdF(u^k) + tau

# get QF(u^k)
rhs = self.integrate()

rhs -= L.dt * self.QI[self.rank + 1, self.rank + 1] * L.f[self.rank + 1]

# add initial value
rhs += L.u[0]
# add tau if associated
if L.tau[self.rank] is not None:
rhs += L.tau[self.rank]

# build rhs, consisting of the known values from above and new values from previous nodes (at k+1)

# implicit solve with prefactor stemming from the diagonal of Qd
L.u[self.rank + 1] = P.solve_system(
rhs,
L.dt * self.QI[self.rank + 1, self.rank + 1],
L.u[self.rank + 1],
L.time + L.dt * self.coll.nodes[self.rank],
)
# update function values
L.f[self.rank + 1] = P.eval_f(L.u[self.rank + 1], L.time + L.dt * self.coll.nodes[self.rank])

# indicate presence of new values at this level
L.status.updated = True

return None
16 changes: 12 additions & 4 deletions pySDC/projects/Resilience/extrapolation_within_Q.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pySDC.projects.Resilience.vdp import run_vdp


def multiple_runs(prob, dts, num_nodes, quad_type='RADAU-RIGHT'):
def multiple_runs(prob, dts, num_nodes, quad_type='RADAU-RIGHT', QI='LU', useMPI=False):
"""
Make multiple runs of a specific problem and record vital error information
Expand All @@ -32,6 +32,14 @@ def multiple_runs(prob, dts, num_nodes, quad_type='RADAU-RIGHT'):
description['sweeper_params'] = {'num_nodes': num_nodes, 'quad_type': quad_type}
description['convergence_controllers'] = {EstimateExtrapolationErrorWithinQ: {}}

if useMPI:
from pySDC.implementations.sweeper_classes.generic_implicit_MPI import generic_implicit_MPI, MPI

description['sweeper_class'] = generic_implicit_MPI
description['sweeper_params']['comm'] = MPI.COMM_WORLD.Split(MPI.COMM_WORLD.rank < num_nodes)
if MPI.COMM_WORLD.rank > num_nodes:
return None

if prob.__name__ == 'run_advection':
description['problem_params'] = {'order': 6, 'stencil_type': 'center'}

Expand Down Expand Up @@ -90,7 +98,7 @@ def plot_and_compute_order(ax, res, num_nodes, coll_order):
ax.legend(frameon=False)


def check_order(ax, prob, dts, num_nodes, quad_type):
def check_order(ax, prob, dts, num_nodes, quad_type, **kwargs):
"""
Check the order by calling `multiple_runs` and then `plot_and_compute_order`.
Expand All @@ -101,15 +109,15 @@ def check_order(ax, prob, dts, num_nodes, quad_type):
num_nodes (int): Number of nodes
quad_type (str): Type of nodes
"""
res, coll_order = multiple_runs(prob, dts, num_nodes, quad_type)
res, coll_order = multiple_runs(prob, dts, num_nodes, quad_type, **kwargs)
plot_and_compute_order(ax, res, num_nodes, coll_order)


def main():
fig, ax = plt.subplots()
num_nodes = 3
quad_type = 'RADAU-RIGHT'
check_order(ax, run_advection, [5e-1, 1e-1, 5e-2, 1e-2], num_nodes, quad_type)
check_order(ax, run_advection, [5e-1, 1e-1, 5e-2, 1e-2], num_nodes, quad_type, QI='MIN', useMPI=True)
plt.show()


Expand Down
Loading

0 comments on commit a46d631

Please sign in to comment.