Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update collectionsoa.py to allow user defined MPI partitioning #1414

Merged
merged 25 commits into from
Sep 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1831f4e
Update collectionsoa.py to allow user defined MPI partitioning
JamiePringle Aug 21, 2023
3d0077a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 21, 2023
4d5873d
Merge branch 'master' into userPartitionMPI
JamiePringle Aug 21, 2023
68262fe
Update parcels/collection/collectionsoa.py
JamiePringle Aug 21, 2023
2f368f5
Update parcels/collection/collectionsoa.py
JamiePringle Aug 21, 2023
71bfbd7
Update parcels/collection/collectionsoa.py
JamiePringle Aug 23, 2023
fbb2e97
Update parcels/collection/collectionsoa.py
JamiePringle Aug 23, 2023
17469b3
define partitionFunction at particleSet creation
JamiePringle Aug 23, 2023
554713c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 23, 2023
2ca72ea
Update particle set to specify partitionFunction
JamiePringle Aug 23, 2023
e610cd7
Cleaning up partition_function code
erikvansebille Aug 24, 2023
6fc5554
Fixing pre-commit/pydocstyle issues
erikvansebille Aug 24, 2023
4926595
Fixing another pydocstyle issue...
erikvansebille Aug 24, 2023
4269dbd
Cleaning up partition_function for MPI
erikvansebille Aug 24, 2023
fa94df6
Adding unit test for partition_function
erikvansebille Aug 24, 2023
a21c9b6
Fixing flake8 errors
erikvansebille Aug 24, 2023
e148126
Temporary fix for AoS
erikvansebille Aug 24, 2023
4b133df
Merge branch 'master' into userPartitionMPI
erikvansebille Aug 25, 2023
1349655
Merge branch 'master' into userPartitionMPI
erikvansebille Aug 25, 2023
ec04293
Updating MPI test to also run on linux if mpi4py does not exist
erikvansebille Aug 28, 2023
cb91a2e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Aug 28, 2023
356f1f7
Fixing flake8 error
erikvansebille Aug 28, 2023
165c0b9
Added short tutorial section on partition functions to tutorial_MPI
erikvansebille Sep 2, 2023
78948a4
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 2, 2023
22b10ae
Update conf.py to ignore binder.org for linkchecking
erikvansebille Sep 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
# To monitor
r"http://marine.copernicus.eu/", # 2023-06-07 Site non-responsive
r"https://www\.nodc\.noaa\.gov/", # 2023-06-23 Site non-responsive
r"https://mybinder\.org/", # 2023-09-02 Site non-responsive
]

# The version info for the project you're documenting, acts as replacement for
Expand Down
47 changes: 46 additions & 1 deletion docs/examples/documentation_MPI.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,51 @@
"Note that in principle this means that all MPI processors need access to the full `FieldSet`, which can be Gigabytes in size for large global datasets. Therefore, efficient parallelisation only works if at the same time we also chunk the `FieldSet` into smaller domains.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Optimising the partitioning of the particles with a user-defined partition_function"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Parcels uses so-called *partition functions* to assign particles to processors. By default, Parcels uses a `scikit-learn` `KMeans` clustering algorithm to partition the particles. However, you can also define your own partition function, which can be useful if you know more about the distribution of particles in your domain than the `KMeans` algorithm does.\n",
"\n",
"To define your own partition function, you need to define a function that takes a list of coordinates (lat, lon) as input, and returns a list of integers, defining the MPI processor that each particle should be assigned to. See the example below:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def simple_partition_function(coords, mpi_size=1):\n",
" \"\"\"A very simple partition function that assigns particles to processors\"\"\"\n",
" return np.array(\n",
" [int(i) for i in np.linspace(0, mpi_size, coords.shape[0], endpoint=False)]\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To add the partition function to your script, you need to add it to the ParticleSet constructor:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pset = ParticleSet(..., partition_function=simple_partition_function)"
]
},
{
"attachments": {},
"cell_type": "markdown",
Expand Down Expand Up @@ -121,7 +166,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"For small projects, the above instructions are sufficient. If your project is large, then it is helpful to combine the `proc*` directories into a single zarr dataset and to optimize the chunking for your analysis. What is \"large\"? If you find yourself running out of memory while doing your analyses, saving the results, or sorting the dataset, or if reading the data is taking longer than you can tolerate, your problem is \"large.\" Another rule of thumb is if the size of your output directory is 1/3 or more of the memory of your machine, your problem is large. Chunking and combining the `proc*` data in order to speed up analysis is discussed [in the documentation on runs with large output](https://docs.oceanparcels.org/en/latest/examples/documentation_LargeRunsOutput.html).\n"
"For small projects, the above instructions are sufficient. If your project is large, then it is helpful to combine the `proc*` directories into a single zarr dataset and to optimise the chunking for your analysis. What is \"large\"? If you find yourself running out of memory while doing your analyses, saving the results, or sorting the dataset, or if reading the data is taking longer than you can tolerate, your problem is \"large.\" Another rule of thumb is if the size of your output directory is 1/3 or more of the memory of your machine, your problem is large. Chunking and combining the `proc*` data in order to speed up analysis is discussed [in the documentation on runs with large output](https://docs.oceanparcels.org/en/latest/examples/documentation_LargeRunsOutput.html).\n"
]
},
{
Expand Down
22 changes: 18 additions & 4 deletions docs/examples/example_stommel.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,14 @@
particle.delete()


def simple_partition_function(coords, mpi_size=1):
"""A very simple partition function that assigns particles to processors (for MPI testing purposes))"""
return np.array([int(i) for i in np.linspace(0, mpi_size, coords.shape[0], endpoint=False)])

Check warning on line 91 in docs/examples/example_stommel.py

View check run for this annotation

Codecov / codecov/patch

docs/examples/example_stommel.py#L91

Added line #L91 was not covered by tests


def stommel_example(npart=1, mode='jit', verbose=False, method=AdvectionRK4, grid_type='A',
outfile="StommelParticle.zarr", repeatdt=None, maxage=None, write_fields=True, pset_mode='soa'):
outfile="StommelParticle.zarr", repeatdt=None, maxage=None, write_fields=True,
custom_partition_function=False, pset_mode='soa'):
timer.fieldset = timer.Timer('FieldSet', parent=timer.stommel)
fieldset = stommel_fieldset(grid_type=grid_type)
if write_fields:
Expand All @@ -105,8 +111,13 @@
p_start = Variable('p_start', dtype=np.float32, initial=fieldset.P)
age = Variable('age', dtype=np.float32, initial=0.)

pset = pset_type[pset_mode]['pset'].from_line(fieldset, size=npart, pclass=MyParticle, repeatdt=repeatdt,
start=(10e3, 5000e3), finish=(100e3, 5000e3), time=0)
if custom_partition_function:
pset = pset_type[pset_mode]['pset'].from_line(fieldset, size=npart, pclass=MyParticle, repeatdt=repeatdt,

Check warning on line 115 in docs/examples/example_stommel.py

View check run for this annotation

Codecov / codecov/patch

docs/examples/example_stommel.py#L115

Added line #L115 was not covered by tests
start=(10e3, 5000e3), finish=(100e3, 5000e3), time=0,
partition_function=simple_partition_function)
else:
pset = pset_type[pset_mode]['pset'].from_line(fieldset, size=npart, pclass=MyParticle, repeatdt=repeatdt,
start=(10e3, 5000e3), finish=(100e3, 5000e3), time=0)

if verbose:
print(f"Initial particle positions:\n{pset}")
Expand Down Expand Up @@ -174,12 +185,15 @@
help='max age of the particles (after which particles are deleted)')
p.add_argument('-wf', '--write_fields', default=True,
help='Write the hydrodynamic fields to NetCDF')
p.add_argument('-cpf', '--custom_partition_function', default=False,
help='Use a custom partition_function (for MPI testing purposes)')
args = p.parse_args(args)

timer.args.stop()
timer.stommel = timer.Timer('Stommel', parent=timer.root)
stommel_example(args.particles, mode=args.mode, verbose=args.verbose, method=method[args.method],
outfile=args.outfile, repeatdt=args.repeatdt, maxage=args.maxage, write_fields=args.write_fields)
outfile=args.outfile, repeatdt=args.repeatdt, maxage=args.maxage, write_fields=args.write_fields,
custom_partition_function=args.custom_partition_function)
timer.stommel.stop()
timer.root.stop()
timer.root.print_tree()
Expand Down
7 changes: 5 additions & 2 deletions parcels/collection/collectionaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p
self._pu_indicators = convert_to_flat_array(partitions)

for kwvar in kwargs:
assert lon.size == kwargs[kwvar].size, (
f'{kwvar} and positions (lon, lat, depth) do nott have the same lengths.')
if kwvar not in ['partition_function']:
assert lon.size == kwargs[kwvar].size, (
f'{kwvar} and positions (lon, lat, depth) do nott have the same lengths.')

offset = np.max(pid) if (pid is not None) and len(pid) > 0 else -1
if MPI:
Expand Down Expand Up @@ -124,6 +125,8 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p
for kwvar in kwargs:
if isinstance(kwvar, Field):
continue
if kwvar in ['partition_function']:
continue
if not hasattr(self._data[i], kwvar):
raise RuntimeError(f'Particle class does not have Variable {kwvar}')
setattr(self._data[i], kwvar, kwargs[kwvar][i])
Expand Down
49 changes: 36 additions & 13 deletions parcels/collection/collectionsoa.py
erikvansebille marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
)
from parcels.field import Field
from parcels.particle import JITParticle, ScipyParticle # noqa
from parcels.tools.converters import convert_to_flat_array
from parcels.tools.loggers import logger
from parcels.tools.statuscodes import NotTestedError

Expand All @@ -27,9 +26,38 @@
KMeans = None


def partitionParticlesMPI_default(coords, mpi_size=1):
"""This function takes the coordinates of the particle starting
positions and returns which MPI process will process each
particle.

Input:

coords: numpy array with rows of [lon, lat] so that
coords.shape[0] is the number of particles and coords.shape[1] is 2.

mpi_size=1: the number of MPI processes.

Output:

mpiProcs: an integer array with values from 0 to mpi_size-1
specifying which MPI job will run which particles. len(mpiProcs)
must equal coords.shape[0]
"""
if KMeans:
kmeans = KMeans(n_clusters=mpi_size, random_state=0).fit(coords)
mpiProcs = kmeans.labels_

Check warning on line 49 in parcels/collection/collectionsoa.py

View check run for this annotation

Codecov / codecov/patch

parcels/collection/collectionsoa.py#L47-L49

Added lines #L47 - L49 were not covered by tests
else: # assigning random labels if no KMeans (see https://github.com/OceanParcels/parcels/issues/1261)
logger.warning_once('sklearn needs to be available if MPI is installed. '

Check warning on line 51 in parcels/collection/collectionsoa.py

View check run for this annotation

Codecov / codecov/patch

parcels/collection/collectionsoa.py#L51

Added line #L51 was not covered by tests
'See https://docs.oceanparcels.org/en/latest/installation.html#installation-for-developers for more information')
mpiProcs = np.randint(0, mpi_size, size=coords.shape[0])

Check warning on line 53 in parcels/collection/collectionsoa.py

View check run for this annotation

Codecov / codecov/patch

parcels/collection/collectionsoa.py#L53

Added line #L53 was not covered by tests

return mpiProcs

Check warning on line 55 in parcels/collection/collectionsoa.py

View check run for this annotation

Codecov / codecov/patch

parcels/collection/collectionsoa.py#L55

Added line #L55 was not covered by tests


class ParticleCollectionSOA(ParticleCollection):

def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, partitions=None, ngrid=1, **kwargs):
def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, ngrid=1, **kwargs):
"""
Parameters
----------
Expand All @@ -51,9 +79,10 @@
assert lon.size == time.size, (
'time and positions (lon, lat, depth) don''t have the same lengths.')

# If partitions is false, the partitions are already initialised
if partitions is not None and partitions is not False:
self._pu_indicators = convert_to_flat_array(partitions)
# If a partitioning function for MPI runs has been passed into the
# particle creation with the "partition_function" kwarg, retrieve it here.
# If it has not, assign the default function, partitionParticlesMPI_defualt()
partition_function = kwargs.pop('partition_function', partitionParticlesMPI_default)

for kwvar in kwargs:
assert lon.size == kwargs[kwvar].size, (
Expand All @@ -69,17 +98,11 @@
raise RuntimeError('Cannot initialise with fewer particles than MPI processors')

if mpi_size > 1:
if partitions is not False:
if partition_function is not False:

Check warning on line 101 in parcels/collection/collectionsoa.py

View check run for this annotation

Codecov / codecov/patch

parcels/collection/collectionsoa.py#L101

Added line #L101 was not covered by tests
if (self._pu_indicators is None) or (len(self._pu_indicators) != len(lon)):
if mpi_rank == 0:
coords = np.vstack((lon, lat)).transpose()
if KMeans:
kmeans = KMeans(n_clusters=mpi_size, random_state=0).fit(coords)
self._pu_indicators = kmeans.labels_
else: # assigning random labels if no KMeans (see https://github.com/OceanParcels/parcels/issues/1261)
logger.warning_once('sklearn needs to be available if MPI is installed. '
'See https://docs.oceanparcels.org/en/latest/installation.html#installation-for-developers for more information')
self._pu_indicators = np.randint(0, mpi_size, size=len(lon))
self._pu_indicators = partition_function(coords, mpi_size=mpi_size)

Check warning on line 105 in parcels/collection/collectionsoa.py

View check run for this annotation

Codecov / codecov/patch

parcels/collection/collectionsoa.py#L105

Added line #L105 was not covered by tests
else:
self._pu_indicators = None
self._pu_indicators = mpi_comm.bcast(self._pu_indicators, root=0)
Expand Down
6 changes: 3 additions & 3 deletions parcels/particleset/baseparticleset.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def from_list(cls, fieldset, pclass, lon, lat, depth=None, time=None, repeatdt=N
return cls(fieldset=fieldset, pclass=pclass, lon=lon, lat=lat, depth=depth, time=time, repeatdt=repeatdt, lonlatdepth_dtype=lonlatdepth_dtype, **kwargs)

@classmethod
def from_line(cls, fieldset, pclass, start, finish, size, depth=None, time=None, repeatdt=None, lonlatdepth_dtype=None):
def from_line(cls, fieldset, pclass, start, finish, size, depth=None, time=None, repeatdt=None, lonlatdepth_dtype=None, **kwargs):
"""Create a particleset in the shape of a line (according to a cartesian grid).

Initialise the ParticleSet from start/finish coordinates with equidistant spacing
Expand Down Expand Up @@ -175,7 +175,7 @@ def from_line(cls, fieldset, pclass, start, finish, size, depth=None, time=None,
lat = np.linspace(start[1], finish[1], size)
if type(depth) in [int, float]:
depth = [depth] * size
return cls(fieldset=fieldset, pclass=pclass, lon=lon, lat=lat, depth=depth, time=time, repeatdt=repeatdt, lonlatdepth_dtype=lonlatdepth_dtype)
return cls(fieldset=fieldset, pclass=pclass, lon=lon, lat=lat, depth=depth, time=time, repeatdt=repeatdt, lonlatdepth_dtype=lonlatdepth_dtype, **kwargs)

@classmethod
@abstractmethod
Expand Down Expand Up @@ -557,7 +557,7 @@ def execute(self, pyfunc=AdvectionRK4, pyfunc_inter=None, endtime=None, runtime=
lat=self.repeatlat, depth=self.repeatdepth,
pclass=self.repeatpclass,
lonlatdepth_dtype=self.collection.lonlatdepth_dtype,
partitions=False, pid_orig=self.repeatpid, **self.repeatkwargs)
partition_function=False, pid_orig=self.repeatpid, **self.repeatkwargs)
for p in pset_new:
p.dt = dt
self.add(pset_new)
Expand Down
10 changes: 6 additions & 4 deletions parcels/particleset/particlesetaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,10 @@ def ObjectClass_sizeof_forward(self):
'lon lat depth precision should be set to either np.float32 or np.float64'

for kwvar in kwargs:
kwargs[kwvar] = convert_to_flat_array(kwargs[kwvar])
assert lon.size == kwargs[kwvar].size, (
f"{kwvar} and positions (lon, lat, depth) don't have the same lengths.")
if kwvar not in ['partition_function']:
kwargs[kwvar] = convert_to_flat_array(kwargs[kwvar])
assert lon.size == kwargs[kwvar].size, (
f"{kwvar} and positions (lon, lat, depth) don't have the same lengths.")

self.repeatdt = repeatdt.total_seconds() if isinstance(repeatdt, delta) else repeatdt
if self.repeatdt:
Expand All @@ -270,7 +271,8 @@ def ObjectClass_sizeof_forward(self):
self.repeatlat = copy(self._collection.lat)
self.repeatdepth = copy(self._collection.depth)
for kwvar in kwargs:
self.repeatkwargs[kwvar] = copy(getattr(self._collection, kwvar))
if kwvar not in ['partition_function']:
self.repeatkwargs[kwvar] = copy(getattr(self._collection, kwvar))

if self.repeatdt:
if MPI and self._collection.pu_indicators is not None:
Expand Down
19 changes: 10 additions & 9 deletions parcels/particleset/particlesetsoa.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ class ParticleSetSOA(BaseParticleSet):
and np.float64 if the interpolation method is 'cgrid_velocity'
pid_orig :
Optional list of (offsets for) the particle IDs
partitions :
List of cores on which to distribute the particles for MPI runs. Default: None, in which case particles
are distributed automatically on the processors
partition_function :
Function to use for partitioning particles over processors. Default is to use kMeans
periodic_domain_zonal :
Zonal domain size, used to apply zonally periodic boundaries for particle-particle
interaction. If None, no zonally periodic boundaries are applied
Expand Down Expand Up @@ -132,7 +131,6 @@ def ArrayClass_init(self, *args, **kwargs):
logger.warning_once("No FieldSet provided in ParticleSet generation. This breaks most Parcels functionality")
else:
self.fieldset.check_complete()
partitions = kwargs.pop('partitions', None)

lon = np.empty(shape=0) if lon is None else convert_to_flat_array(lon)
lat = np.empty(shape=0) if lat is None else convert_to_flat_array(lat)
Expand Down Expand Up @@ -169,9 +167,10 @@ def ArrayClass_init(self, *args, **kwargs):
'lon lat depth precision should be set to either np.float32 or np.float64'

for kwvar in kwargs:
kwargs[kwvar] = convert_to_flat_array(kwargs[kwvar])
assert lon.size == kwargs[kwvar].size, (
f"{kwvar} and positions (lon, lat, depth) don't have the same lengths.")
if kwvar not in ['partition_function']:
kwargs[kwvar] = convert_to_flat_array(kwargs[kwvar])
assert lon.size == kwargs[kwvar].size, (
f"{kwvar} and positions (lon, lat, depth) don't have the same lengths.")

self.repeatdt = repeatdt.total_seconds() if isinstance(repeatdt, delta) else repeatdt
if self.repeatdt:
Expand All @@ -181,6 +180,7 @@ def ArrayClass_init(self, *args, **kwargs):
raise 'All Particle.time should be the same when repeatdt is not None'
self.repeatpclass = pclass
self.repeatkwargs = kwargs
self.repeatkwargs.pop('partition_function', None)

ngrids = fieldset.gridset.size if fieldset is not None else 1

Expand All @@ -198,7 +198,7 @@ def ArrayClass_init(self, *args, **kwargs):
self._collection = ParticleCollectionSOA(
_pclass, lon=lon, lat=lat, depth=depth, time=time,
lonlatdepth_dtype=lonlatdepth_dtype, pid_orig=pid_orig,
partitions=partitions, ngrid=ngrids, **kwargs)
ngrid=ngrids, **kwargs)

# Initialize neighbor search data structure (used for interaction).
if interaction_distance is not None:
Expand Down Expand Up @@ -245,7 +245,8 @@ def ArrayClass_init(self, *args, **kwargs):
self.repeatlat = copy(self._collection.data['lat'])
self.repeatdepth = copy(self._collection.data['depth'])
for kwvar in kwargs:
self.repeatkwargs[kwvar] = copy(self._collection.data[kwvar])
if kwvar not in ['partition_function']:
self.repeatkwargs[kwvar] = copy(self._collection.data[kwvar])

if self.repeatdt:
if MPI and self._collection.pu_indicators is not None:
Expand Down
Loading
Loading