Skip to content

Commit

Permalink
MPI parallel adaptive collocation (#350)
Browse files Browse the repository at this point in the history
* MPI parallel version of adaptive collocation

* Compute end point in `generic_implicit_MPI` sweeper

* Cleanup

* More cleanup
  • Loading branch information
brownbaerchen authored Aug 30, 2023
1 parent a46d631 commit 1a51834
Show file tree
Hide file tree
Showing 10 changed files with 413 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,39 @@ def setup(self, controller, params, description, **kwargs):

defaults['num_colls'] = max([defaults['num_colls'], len(params[key])])

self.comm = description['sweeper_params'].get('comm', None)
if self.comm:
from mpi4py import MPI

self.MPI_SUM = MPI.SUM

return {**defaults, **super().setup(controller, params, description, **kwargs)}

def matmul(self, A, b):
"""
Matrix vector multiplication, possibly MPI parallel.
The parallel implementation performs a reduce operation in every row of the matrix. While communicating the
entire vector once could reduce the number of communications, this way we never need to store the entire vector
on any specific rank.
Args:
A (2d np.ndarray): Matrix
b (list): Vector
Returns:
List: Axb
"""
if self.comm:
res = [A[i, 0] * b[0] if b[i] is not None else None for i in range(A.shape[0])]
buf = b[0] * 0.0
for i in range(1, A.shape[1]):
self.comm.Reduce(A[i, self.comm.rank + 1] * b[self.comm.rank + 1], buf, op=self.MPI_SUM, root=i - 1)
if i == self.comm.rank + 1:
res[i] += buf
return res
else:
return A @ b

def switch_sweeper(self, S):
"""
Update to the next sweeper in line.
Expand All @@ -105,7 +136,7 @@ def switch_sweeper(self, S):
P = L.prob

# store solution of current level which will be interpolated to new level
u_old = [me.flatten() for me in L.u]
u_old = [me.flatten() if me is not None else me for me in L.u]
nodes_old = L.sweep.coll.nodes.copy()

# change sweeper
Expand All @@ -120,17 +151,19 @@ def switch_sweeper(self, S):
nodes_new = L.sweep.coll.nodes.copy()
interpolator = LagrangeApproximation(points=np.append(0, nodes_old))

u_inter = interpolator.getInterpolationMatrix(np.append(0, nodes_new)) @ u_old
u_inter = self.matmul(interpolator.getInterpolationMatrix(np.append(0, nodes_new)), u_old)

# assign the interpolated values to the nodes in the level
for i in range(0, len(u_inter)):
me = P.dtype_u(P.init)
me[:] = np.reshape(u_inter[i], P.init[0])
L.u[i] = me
if u_inter[i] is not None:
me = P.dtype_u(P.init)
me[:] = np.reshape(u_inter[i], P.init[0])
L.u[i] = me

# reevaluate rhs
for i in range(L.sweep.coll.num_nodes + 1):
L.f[i] = L.prob.eval_f(L.u[i], L.time)
if L.u[i] is not None:
L.f[i] = L.prob.eval_f(L.u[i], L.time)

# log the new parameters
self.log(f'Switching to collocation {self.status.active_coll + 1} of {self.params.num_colls}', S, level=20)
Expand Down
90 changes: 90 additions & 0 deletions pySDC/implementations/problem_classes/polynomial_test_problem.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import numpy as np

from pySDC.core.Problem import ptype
from pySDC.implementations.datatype_classes.mesh import mesh


class polynomial_testequation(ptype):
"""
Dummy problem for tests only! In particular, the `solve_system` function just returns the exact solution instead of
solving an appropriate system. This class is indented to be used for tests of operations that are exact on polynomials.
"""

dtype_u = mesh
dtype_f = mesh

def __init__(self, degree=1, seed=26266):
"""Initialization routine"""

# invoke super init, passing number of dofs, dtype_u and dtype_f
super().__init__(init=(1, None, np.dtype('float64')))

self.rng = np.random.RandomState(seed=seed)
self.poly = np.polynomial.Polynomial(self.rng.rand(degree))
self._makeAttributeAndRegister('degree', 'seed', localVars=locals(), readOnly=True)

def eval_f(self, u, t):
"""
Derivative of the polynomial.
Parameters
----------
u : dtype_u
Current values of the numerical solution.
t : float
Current time of the numerical solution is computed.
Returns
-------
f : dtype_f
The right-hand side of the problem.
"""

f = self.dtype_f(self.init)
f[:] = self.poly.deriv(m=1)(t)
return f

def solve_system(self, rhs, factor, u0, t):
"""
Just return the exact solution...
Parameters
----------
rhs : dtype_f
Right-hand side for the linear system.
factor : float
Abbrev. for the local stepsize (or any other factor required).
u0 : dtype_u
Initial guess for the iterative solver.
t : float
Current time (e.g. for time-dependent BCs).
Returns
-------
me : dtype_u
The solution as mesh.
"""

return self.u_exact(t)

def u_exact(self, t, **kwargs):
"""
Evaluate the polynomial.
Parameters
----------
t : float
Time of the exact solution.
u_init : pySDC.problem.testequation0d.dtype_u
Initial solution.
t_init : float
The initial time.
Returns
-------
me : dtype_u
The exact solution.
"""
me = self.dtype_u(self.init)
me[:] = self.poly(t)
return me
5 changes: 1 addition & 4 deletions pySDC/implementations/sweeper_classes/generic_implicit.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def __init__(self, params):
params['QI'] = 'IE'

# call parent's initialization routine
super(generic_implicit, self).__init__(params)
super().__init__(params)

# get QI matrix
self.QI = self.get_Qdelta_implicit(self.coll, qd_type=self.params.QI)
Expand All @@ -34,7 +34,6 @@ def integrate(self):
list of dtype_u: containing the integral as values
"""

# get current level and problem description
L = self.level
P = L.prob

Expand All @@ -57,7 +56,6 @@ def update_nodes(self):
None
"""

# get current level and problem description
L = self.level
P = L.prob

Expand Down Expand Up @@ -112,7 +110,6 @@ def compute_end_point(self):
None
"""

# get current level and problem description
L = self.level
P = L.prob

Expand Down
32 changes: 27 additions & 5 deletions pySDC/implementations/sweeper_classes/generic_implicit_MPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def compute_end_point(self):
None
"""

# get current level and problem description
L = self.level
P = L.prob
L.uend = P.dtype_u(P.init, val=0.0)
Expand All @@ -69,7 +68,6 @@ def compute_residual(self, stage=None):
stage (str): The current stage of the step the level belongs to
"""

# get current level and problem description
L = self.level

# Check if we want to skip the residual computation to gain performance
Expand Down Expand Up @@ -108,7 +106,6 @@ def predict(self):
and evaluates the RHS of the ODE there
"""

# get current level and problem description
L = self.level
P = L.prob

Expand Down Expand Up @@ -144,7 +141,6 @@ def integrate(self):
list of dtype_u: containing the integral as values
"""

# get current level and problem description
L = self.level
P = L.prob

Expand All @@ -165,7 +161,6 @@ def update_nodes(self):
None
"""

# get current level and problem description
L = self.level
P = L.prob

Expand Down Expand Up @@ -204,3 +199,30 @@ def update_nodes(self):
L.status.updated = True

return None

def compute_end_point(self):
"""
Compute u at the right point of the interval
The value uend computed here is a full evaluation of the Picard formulation unless do_full_update==False
Returns:
None
"""

L = self.level
P = L.prob
L.uend = P.dtype_u(P.init, val=0.0)

# check if Mth node is equal to right point and do_coll_update is false, perform a simple copy
if self.coll.right_is_node and not self.params.do_coll_update:
super().compute_end_point()
else:
L.uend = P.dtype_u(L.u[0])
self.params.comm.Allreduce(L.dt * self.coll.weights[self.rank] * L.f[self.rank + 1], L.uend, op=MPI.SUM)
L.uend += L.u[0]

# add up tau correction of the full interval (last entry)
if L.tau[-1] is not None:
L.uend += L.tau[-1]
return None
Loading

0 comments on commit 1a51834

Please sign in to comment.