Skip to content

Commit

Permalink
Merge branch 'v3.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvansebille committed Sep 2, 2023
2 parents 6316163 + cb44378 commit bed2400
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 53 deletions.
1 change: 1 addition & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
# To monitor
r"http://marine.copernicus.eu/", # 2023-06-07 Site non-responsive
r"https://www\.nodc\.noaa\.gov/", # 2023-06-23 Site non-responsive
r"https://mybinder\.org/", # 2023-09-02 Site non-responsive
]

# The version info for the project you're documenting, acts as replacement for
Expand Down
47 changes: 46 additions & 1 deletion docs/examples/documentation_MPI.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,51 @@
"Note that in principle this means that all MPI processors need access to the full `FieldSet`, which can be Gigabytes in size for large global datasets. Therefore, efficient parallelisation only works if at the same time we also chunk the `FieldSet` into smaller domains.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Optimising the partitioning of the particles with a user-defined partition_function"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Parcels uses so-called *partition functions* to assign particles to processors. By default, Parcels uses a `scikit-learn` `KMeans` clustering algorithm to partition the particles. However, you can also define your own partition function, which can be useful if you know more about the distribution of particles in your domain than the `KMeans` algorithm does.\n",
"\n",
"To define your own partition function, you need to define a function that takes a list of coordinates (lat, lon) as input, and returns a list of integers, defining the MPI processor that each particle should be assigned to. See the example below:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def simple_partition_function(coords, mpi_size=1):\n",
" \"\"\"A very simple partition function that assigns particles to processors\"\"\"\n",
" return np.array(\n",
" [int(i) for i in np.linspace(0, mpi_size, coords.shape[0], endpoint=False)]\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"To add the partition function to your script, you need to add it to the ParticleSet constructor:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"pset = ParticleSet(..., partition_function=simple_partition_function)"
]
},
{
"attachments": {},
"cell_type": "markdown",
Expand Down Expand Up @@ -121,7 +166,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"For small projects, the above instructions are sufficient. If your project is large, then it is helpful to combine the `proc*` directories into a single zarr dataset and to optimize the chunking for your analysis. What is \"large\"? If you find yourself running out of memory while doing your analyses, saving the results, or sorting the dataset, or if reading the data is taking longer than you can tolerate, your problem is \"large.\" Another rule of thumb is if the size of your output directory is 1/3 or more of the memory of your machine, your problem is large. Chunking and combining the `proc*` data in order to speed up analysis is discussed [in the documentation on runs with large output](https://docs.oceanparcels.org/en/latest/examples/documentation_LargeRunsOutput.html).\n"
"For small projects, the above instructions are sufficient. If your project is large, then it is helpful to combine the `proc*` directories into a single zarr dataset and to optimise the chunking for your analysis. What is \"large\"? If you find yourself running out of memory while doing your analyses, saving the results, or sorting the dataset, or if reading the data is taking longer than you can tolerate, your problem is \"large.\" Another rule of thumb is if the size of your output directory is 1/3 or more of the memory of your machine, your problem is large. Chunking and combining the `proc*` data in order to speed up analysis is discussed [in the documentation on runs with large output](https://docs.oceanparcels.org/en/latest/examples/documentation_LargeRunsOutput.html).\n"
]
},
{
Expand Down
22 changes: 18 additions & 4 deletions docs/examples/example_stommel.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,14 @@ def AgeP(particle, fieldset, time):
particle.delete()


def simple_partition_function(coords, mpi_size=1):
"""A very simple partition function that assigns particles to processors (for MPI testing purposes))"""
return np.array([int(i) for i in np.linspace(0, mpi_size, coords.shape[0], endpoint=False)])


def stommel_example(npart=1, mode='jit', verbose=False, method=AdvectionRK4, grid_type='A',
outfile="StommelParticle.zarr", repeatdt=None, maxage=None, write_fields=True, pset_mode='soa'):
outfile="StommelParticle.zarr", repeatdt=None, maxage=None, write_fields=True,
custom_partition_function=False, pset_mode='soa'):
timer.fieldset = timer.Timer('FieldSet', parent=timer.stommel)
fieldset = stommel_fieldset(grid_type=grid_type)
if write_fields:
Expand All @@ -105,8 +111,13 @@ class MyParticle(ParticleClass):
p_start = Variable('p_start', dtype=np.float32, initial=fieldset.P)
age = Variable('age', dtype=np.float32, initial=0.)

pset = pset_type[pset_mode]['pset'].from_line(fieldset, size=npart, pclass=MyParticle, repeatdt=repeatdt,
start=(10e3, 5000e3), finish=(100e3, 5000e3), time=0)
if custom_partition_function:
pset = pset_type[pset_mode]['pset'].from_line(fieldset, size=npart, pclass=MyParticle, repeatdt=repeatdt,
start=(10e3, 5000e3), finish=(100e3, 5000e3), time=0,
partition_function=simple_partition_function)
else:
pset = pset_type[pset_mode]['pset'].from_line(fieldset, size=npart, pclass=MyParticle, repeatdt=repeatdt,
start=(10e3, 5000e3), finish=(100e3, 5000e3), time=0)

if verbose:
print(f"Initial particle positions:\n{pset}")
Expand Down Expand Up @@ -174,12 +185,15 @@ def main(args=None):
help='max age of the particles (after which particles are deleted)')
p.add_argument('-wf', '--write_fields', default=True,
help='Write the hydrodynamic fields to NetCDF')
p.add_argument('-cpf', '--custom_partition_function', default=False,
help='Use a custom partition_function (for MPI testing purposes)')
args = p.parse_args(args)

timer.args.stop()
timer.stommel = timer.Timer('Stommel', parent=timer.root)
stommel_example(args.particles, mode=args.mode, verbose=args.verbose, method=method[args.method],
outfile=args.outfile, repeatdt=args.repeatdt, maxage=args.maxage, write_fields=args.write_fields)
outfile=args.outfile, repeatdt=args.repeatdt, maxage=args.maxage, write_fields=args.write_fields,
custom_partition_function=args.custom_partition_function)
timer.stommel.stop()
timer.root.stop()
timer.root.print_tree()
Expand Down
7 changes: 5 additions & 2 deletions parcels/collection/collectionaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p
self._pu_indicators = convert_to_flat_array(partitions)

for kwvar in kwargs:
assert lon.size == kwargs[kwvar].size, (
f'{kwvar} and positions (lon, lat, depth) do nott have the same lengths.')
if kwvar not in ['partition_function']:
assert lon.size == kwargs[kwvar].size, (
f'{kwvar} and positions (lon, lat, depth) do nott have the same lengths.')

offset = np.max(pid) if (pid is not None) and len(pid) > 0 else -1
if MPI:
Expand Down Expand Up @@ -124,6 +125,8 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p
for kwvar in kwargs:
if isinstance(kwvar, Field):
continue
if kwvar in ['partition_function']:
continue
if not hasattr(self._data[i], kwvar):
raise RuntimeError(f'Particle class does not have Variable {kwvar}')
setattr(self._data[i], kwvar, kwargs[kwvar][i])
Expand Down
49 changes: 36 additions & 13 deletions parcels/collection/collectionsoa.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
)
from parcels.field import Field
from parcels.particle import JITParticle, ScipyParticle # noqa
from parcels.tools.converters import convert_to_flat_array
from parcels.tools.loggers import logger
from parcels.tools.statuscodes import NotTestedError

Expand All @@ -27,9 +26,38 @@
KMeans = None


def partitionParticlesMPI_default(coords, mpi_size=1):
"""This function takes the coordinates of the particle starting
positions and returns which MPI process will process each
particle.
Input:
coords: numpy array with rows of [lon, lat] so that
coords.shape[0] is the number of particles and coords.shape[1] is 2.
mpi_size=1: the number of MPI processes.
Output:
mpiProcs: an integer array with values from 0 to mpi_size-1
specifying which MPI job will run which particles. len(mpiProcs)
must equal coords.shape[0]
"""
if KMeans:
kmeans = KMeans(n_clusters=mpi_size, random_state=0).fit(coords)
mpiProcs = kmeans.labels_
else: # assigning random labels if no KMeans (see https://github.com/OceanParcels/parcels/issues/1261)
logger.warning_once('sklearn needs to be available if MPI is installed. '
'See https://docs.oceanparcels.org/en/latest/installation.html#installation-for-developers for more information')
mpiProcs = np.randint(0, mpi_size, size=coords.shape[0])

return mpiProcs


class ParticleCollectionSOA(ParticleCollection):

def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, partitions=None, ngrid=1, **kwargs):
def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, ngrid=1, **kwargs):
"""
Parameters
----------
Expand All @@ -51,9 +79,10 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p
assert lon.size == time.size, (
'time and positions (lon, lat, depth) don''t have the same lengths.')

# If partitions is false, the partitions are already initialised
if partitions is not None and partitions is not False:
self._pu_indicators = convert_to_flat_array(partitions)
# If a partitioning function for MPI runs has been passed into the
# particle creation with the "partition_function" kwarg, retrieve it here.
# If it has not, assign the default function, partitionParticlesMPI_defualt()
partition_function = kwargs.pop('partition_function', partitionParticlesMPI_default)

for kwvar in kwargs:
assert lon.size == kwargs[kwvar].size, (
Expand All @@ -69,17 +98,11 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p
raise RuntimeError('Cannot initialise with fewer particles than MPI processors')

if mpi_size > 1:
if partitions is not False:
if partition_function is not False:
if (self._pu_indicators is None) or (len(self._pu_indicators) != len(lon)):
if mpi_rank == 0:
coords = np.vstack((lon, lat)).transpose()
if KMeans:
kmeans = KMeans(n_clusters=mpi_size, random_state=0).fit(coords)
self._pu_indicators = kmeans.labels_
else: # assigning random labels if no KMeans (see https://github.com/OceanParcels/parcels/issues/1261)
logger.warning_once('sklearn needs to be available if MPI is installed. '
'See https://docs.oceanparcels.org/en/latest/installation.html#installation-for-developers for more information')
self._pu_indicators = np.randint(0, mpi_size, size=len(lon))
self._pu_indicators = partition_function(coords, mpi_size=mpi_size)
else:
self._pu_indicators = None
self._pu_indicators = mpi_comm.bcast(self._pu_indicators, root=0)
Expand Down
6 changes: 3 additions & 3 deletions parcels/particleset/baseparticleset.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def from_list(cls, fieldset, pclass, lon, lat, depth=None, time=None, repeatdt=N
return cls(fieldset=fieldset, pclass=pclass, lon=lon, lat=lat, depth=depth, time=time, repeatdt=repeatdt, lonlatdepth_dtype=lonlatdepth_dtype, **kwargs)

@classmethod
def from_line(cls, fieldset, pclass, start, finish, size, depth=None, time=None, repeatdt=None, lonlatdepth_dtype=None):
def from_line(cls, fieldset, pclass, start, finish, size, depth=None, time=None, repeatdt=None, lonlatdepth_dtype=None, **kwargs):
"""Create a particleset in the shape of a line (according to a cartesian grid).
Initialise the ParticleSet from start/finish coordinates with equidistant spacing
Expand Down Expand Up @@ -175,7 +175,7 @@ def from_line(cls, fieldset, pclass, start, finish, size, depth=None, time=None,
lat = np.linspace(start[1], finish[1], size)
if type(depth) in [int, float]:
depth = [depth] * size
return cls(fieldset=fieldset, pclass=pclass, lon=lon, lat=lat, depth=depth, time=time, repeatdt=repeatdt, lonlatdepth_dtype=lonlatdepth_dtype)
return cls(fieldset=fieldset, pclass=pclass, lon=lon, lat=lat, depth=depth, time=time, repeatdt=repeatdt, lonlatdepth_dtype=lonlatdepth_dtype, **kwargs)

@classmethod
@abstractmethod
Expand Down Expand Up @@ -557,7 +557,7 @@ def execute(self, pyfunc=AdvectionRK4, pyfunc_inter=None, endtime=None, runtime=
lat=self.repeatlat, depth=self.repeatdepth,
pclass=self.repeatpclass,
lonlatdepth_dtype=self.collection.lonlatdepth_dtype,
partitions=False, pid_orig=self.repeatpid, **self.repeatkwargs)
partition_function=False, pid_orig=self.repeatpid, **self.repeatkwargs)
for p in pset_new:
p.dt = dt
self.add(pset_new)
Expand Down
10 changes: 6 additions & 4 deletions parcels/particleset/particlesetaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,10 @@ def ObjectClass_sizeof_forward(self):
'lon lat depth precision should be set to either np.float32 or np.float64'

for kwvar in kwargs:
kwargs[kwvar] = convert_to_flat_array(kwargs[kwvar])
assert lon.size == kwargs[kwvar].size, (
f"{kwvar} and positions (lon, lat, depth) don't have the same lengths.")
if kwvar not in ['partition_function']:
kwargs[kwvar] = convert_to_flat_array(kwargs[kwvar])
assert lon.size == kwargs[kwvar].size, (
f"{kwvar} and positions (lon, lat, depth) don't have the same lengths.")

self.repeatdt = repeatdt.total_seconds() if isinstance(repeatdt, delta) else repeatdt
if self.repeatdt:
Expand All @@ -270,7 +271,8 @@ def ObjectClass_sizeof_forward(self):
self.repeatlat = copy(self._collection.lat)
self.repeatdepth = copy(self._collection.depth)
for kwvar in kwargs:
self.repeatkwargs[kwvar] = copy(getattr(self._collection, kwvar))
if kwvar not in ['partition_function']:
self.repeatkwargs[kwvar] = copy(getattr(self._collection, kwvar))

if self.repeatdt:
if MPI and self._collection.pu_indicators is not None:
Expand Down
19 changes: 10 additions & 9 deletions parcels/particleset/particlesetsoa.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,8 @@ class ParticleSetSOA(BaseParticleSet):
and np.float64 if the interpolation method is 'cgrid_velocity'
pid_orig :
Optional list of (offsets for) the particle IDs
partitions :
List of cores on which to distribute the particles for MPI runs. Default: None, in which case particles
are distributed automatically on the processors
partition_function :
Function to use for partitioning particles over processors. Default is to use kMeans
periodic_domain_zonal :
Zonal domain size, used to apply zonally periodic boundaries for particle-particle
interaction. If None, no zonally periodic boundaries are applied
Expand Down Expand Up @@ -132,7 +131,6 @@ def ArrayClass_init(self, *args, **kwargs):
logger.warning_once("No FieldSet provided in ParticleSet generation. This breaks most Parcels functionality")
else:
self.fieldset.check_complete()
partitions = kwargs.pop('partitions', None)

lon = np.empty(shape=0) if lon is None else convert_to_flat_array(lon)
lat = np.empty(shape=0) if lat is None else convert_to_flat_array(lat)
Expand Down Expand Up @@ -169,9 +167,10 @@ def ArrayClass_init(self, *args, **kwargs):
'lon lat depth precision should be set to either np.float32 or np.float64'

for kwvar in kwargs:
kwargs[kwvar] = convert_to_flat_array(kwargs[kwvar])
assert lon.size == kwargs[kwvar].size, (
f"{kwvar} and positions (lon, lat, depth) don't have the same lengths.")
if kwvar not in ['partition_function']:
kwargs[kwvar] = convert_to_flat_array(kwargs[kwvar])
assert lon.size == kwargs[kwvar].size, (
f"{kwvar} and positions (lon, lat, depth) don't have the same lengths.")

self.repeatdt = repeatdt.total_seconds() if isinstance(repeatdt, delta) else repeatdt
if self.repeatdt:
Expand All @@ -181,6 +180,7 @@ def ArrayClass_init(self, *args, **kwargs):
raise 'All Particle.time should be the same when repeatdt is not None'
self.repeatpclass = pclass
self.repeatkwargs = kwargs
self.repeatkwargs.pop('partition_function', None)

ngrids = fieldset.gridset.size if fieldset is not None else 1

Expand All @@ -198,7 +198,7 @@ def ArrayClass_init(self, *args, **kwargs):
self._collection = ParticleCollectionSOA(
_pclass, lon=lon, lat=lat, depth=depth, time=time,
lonlatdepth_dtype=lonlatdepth_dtype, pid_orig=pid_orig,
partitions=partitions, ngrid=ngrids, **kwargs)
ngrid=ngrids, **kwargs)

# Initialize neighbor search data structure (used for interaction).
if interaction_distance is not None:
Expand Down Expand Up @@ -245,7 +245,8 @@ def ArrayClass_init(self, *args, **kwargs):
self.repeatlat = copy(self._collection.data['lat'])
self.repeatdepth = copy(self._collection.data['depth'])
for kwvar in kwargs:
self.repeatkwargs[kwvar] = copy(self._collection.data[kwvar])
if kwvar not in ['partition_function']:
self.repeatkwargs[kwvar] = copy(self._collection.data[kwvar])

if self.repeatdt:
if MPI and self._collection.pu_indicators is not None:
Expand Down
Loading

0 comments on commit bed2400

Please sign in to comment.