From 1831f4ece3f71512d627047263d9e469af76afa1 Mon Sep 17 00:00:00 2001 From: JamiePringle Date: Mon, 21 Aug 2023 11:06:16 -0400 Subject: [PATCH 01/22] Update collectionsoa.py to allow user defined MPI partitioning This code allows users to change the function which determines which particles are run on which MPI jobs using the function setPartitionFunction(). --- parcels/collection/collectionsoa.py | 206 ++++++++++++++++++++-------- 1 file changed, 148 insertions(+), 58 deletions(-) diff --git a/parcels/collection/collectionsoa.py b/parcels/collection/collectionsoa.py index b3eff7705..202a800c4 100644 --- a/parcels/collection/collectionsoa.py +++ b/parcels/collection/collectionsoa.py @@ -26,15 +26,72 @@ except: KMeans = None - +#============================================================================= +# The function partitionParticles4MPI_default() is the default scheme +# to partition the different particles into different MPI +# processes. The function setPartitionFunction() will, if called +# before the particle set is created, allow the user to specify their +# own scheme for partitioning the particles to different MPI jobs by +# passing a new partitioning function to setPartitionFunction(). The +# arguements of this new function must match those of +# partitionParticles4MPI_default(), as described in the comments to +# that function. + +def partitionParticles4MPI_default(coords,mpi_size=1): + '''This function takes the coordinates of the particle starting + positions and returns which MPI process will process each + particle. + + Input: + + coords: numpy array with rows of [lon, lat] so that + coords.shape[0] is the number of particles and coords.shape[1] is 2. + + mpi_size=1: the number of MPI processes. + + Output: + + mpiProcs: an integer array with values from 0 to mpi_size-1 + specifying which MPI job will run which particles. len(mpiProcs) + must equal coords.shape[0] + + ''' + + if KMeans: + kmeans = KMeans(n_clusters=mpi_size, random_state=0).fit(coords) + mpiProcs = kmeans.labels_ + else: # assigning random labels if no KMeans (see https://github.com/OceanParcels/parcels/issues/1261) + logger.warning_once('sklearn needs to be available if MPI is installed. ' + 'See http://oceanparcels.org/#parallel_install for more information') + mpiProcs = np.randint(0, mpi_size, size=len(lon)) + + #print('Using default KMeans partitioning of particles to MPI processes',flush=True) + + return mpiProcs + +#by default, the partition is done by the functiond defined above +partitionParticles4MPI=partitionParticles4MPI_default + +#This function, if called before the particle set is created, will alter how the +#particle set is partitioned between MPI jobs. +def setPartitionFunction(partitionFunction): + global partitionParticles4MPI + partitionParticles4MPI=partitionFunction + return None +#============================================================================= + + + class ParticleCollectionSOA(ParticleCollection): def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, partitions=None, ngrid=1, **kwargs): """ - :param ngrid: number of grids in the fieldset of the overarching ParticleSet - required for initialising the - field references of the ctypes-link of particles that are allocated + Parameters + ---------- + ngrid : + number of grids in the fieldset of the overarching ParticleSet - required for initialising the + field references of the ctypes-link of particles that are allocated """ - super(ParticleCollection, self).__init__() assert pid_orig is not None, "particle IDs are None - incompatible with the collection. Invalid state." @@ -55,7 +112,7 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p for kwvar in kwargs: assert lon.size == kwargs[kwvar].size, ( - '%s and positions (lon, lat, depth) don''t have the same lengths.' % kwvar) + f"{kwvar} and positions (lon, lat, depth) don't have the same lengths.") offset = np.max(pid) if (pid is not None) and len(pid) > 0 else -1 if MPI: @@ -71,13 +128,13 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p if (self._pu_indicators is None) or (len(self._pu_indicators) != len(lon)): if mpi_rank == 0: coords = np.vstack((lon, lat)).transpose() - if KMeans: - kmeans = KMeans(n_clusters=mpi_size, random_state=0).fit(coords) - self._pu_indicators = kmeans.labels_ - else: # assigning random labels if no KMeans (see https://github.com/OceanParcels/parcels/issues/1261) - logger.warning_once('sklearn needs to be available if MPI is installed. ' - 'See http://oceanparcels.org/#parallel_install for more information') - self._pu_indicators = np.randint(0, mpi_size, size=len(lon)) + # partitionParticles is a function which + # decides which MPI jobs will process + # which particles. It can be specified in + # the call to setPartiionParticles4MPI() + # before the particle set is created. + self._pu_indicators = partitionParticles4MPI(coords,mpi_size=mpi_size) + #print('In rank 0, _pu_indicators is',self._pu_indicators,flush=True) else: self._pu_indicators = None self._pu_indicators = mpi_comm.bcast(self._pu_indicators, root=0) @@ -136,7 +193,7 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p # any fields that were provided on the command line for kwvar, kwval in kwargs.items(): if not hasattr(pclass, kwvar): - raise RuntimeError('Particle class does not have Variable %s' % kwvar) + raise RuntimeError(f'Particle class does not have Variable {kwvar}') self._data[kwvar][:] = kwval initialised.add(kwvar) @@ -148,7 +205,7 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p if isinstance(v.initial, Field): for i in range(self.ncount): if (time[i] is None) or (np.isnan(time[i])): - raise RuntimeError('Cannot initialise a Variable with a Field if no time provided (time-type: {} values: {}). Add a "time=" to ParticleSet construction'.format(type(time), time)) + raise RuntimeError(f'Cannot initialise a Variable with a Field if no time provided (time-type: {type(time)} values: {time}). Add a "time=" to ParticleSet construction') v.initial.fieldset.computeTimeChunk(time[i], 0) self._data[v.name][i] = v.initial[ time[i], depth[i], lat[i], lon[i] @@ -166,9 +223,7 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p self._riterator = None def __del__(self): - """ - Collection - Destructor - """ + """Collection - Destructor""" super().__del__() def iterator(self): @@ -197,7 +252,10 @@ def __getitem__(self, index): Access a particle in this collection using the fastest access method for this collection - by its index. - :param index: int or np.int32 index of a particle in this collection + Parameters + ---------- + index : int + Index of the particle to access """ return self.get_single_by_index(index) @@ -205,7 +263,10 @@ def __getattr__(self, name): """ Access a single property of all particles. - :param name: name of the property + Parameters + ---------- + name : str + Name of the property to access """ for v in self.ptype.variables: if v.name == name and name in self._data: @@ -261,7 +322,7 @@ def get_single_by_ID(self, id): if self._sorted: index = bisect_left(self._data['id'], id) if index == len(self._data['id']) or self._data['id'][index] != id: - raise ValueError("Trying to access a particle with a non-existing ID: %s." % id) + raise ValueError(f"Trying to access a particle with a non-existing ID: {id}.") else: index = np.where(self._data['id'] == id)[0][0] @@ -344,9 +405,14 @@ def _recursive_ID_lookup(self, low, high, sublist): """Identify the middle element of the sublist and perform binary search on it. - :param low: Lowerbound on the indices to search for IDs. - :param high: Upperbound on the indices to search for IDs. - :param sublist: (Sub)list of IDs to look for. + Parameters + ---------- + low : + Lowerbound on the indices to search for IDs. + high : + Upperbound on the indices to search for IDs. + sublist : list + Sublist of IDs to look for. """ raise NotTestedError # median = floor(len(sublist) / 2) @@ -781,9 +847,7 @@ def clear(self): raise NotImplementedError def cstruct(self): - """ - 'cstruct' returns the ctypes mapping of the particle data. This depends on the specific structure in question. - """ + """Returns the ctypes mapping of the particle data. This depends on the specific structure in question.""" class CParticles(Structure): _fields_ = [(v.name, POINTER(np.ctypeslib.as_ctypes_type(v.dtype))) for v in self._ptype.variables] @@ -837,10 +901,15 @@ def toArray(self): raise NotImplementedError def set_variable_write_status(self, var, write_status): - """ - Method to set the write status of a Variable - :param var: Name of the variable (string) - :param status: Write status of the variable (True, False or 'once') + """Method to set the write status of a Variable + + Parameters + ---------- + var : + Name of the variable (string) + status : + Write status of the variable (True, False or 'once') + write_status : """ var_changed = False for v in self._ptype.variables: @@ -848,19 +917,24 @@ def set_variable_write_status(self, var, write_status): v.to_write = write_status var_changed = True if not var_changed: - raise SyntaxError('Could not change the write status of %s, because it is not a Variable name' % var) + raise SyntaxError(f'Could not change the write status of {var}, because it is not a Variable name') class ParticleAccessorSOA(BaseParticleAccessor): """Wrapper that provides access to particle data in the collection, as if interacting with the particle itself. - :param pcoll: ParticleCollection that the represented particle - belongs to. - :param index: The index at which the data for the represented - particle is stored in the corresponding data arrays - of the ParticleCollecion. + Parameters + ---------- + pcoll : + ParticleCollection that the represented particle + belongs to. + index : + The index at which the data for the represented + particle is stored in the corresponding data arrays + of the ParticleCollecion. """ + _index = 0 _next_dt = None @@ -868,19 +942,26 @@ def __init__(self, pcoll, index): """Initializes the ParticleAccessor to provide access to one specific particle. """ - super(ParticleAccessorSOA, self).__init__(pcoll) + super().__init__(pcoll) self._index = index self._next_dt = None def __getattr__(self, name): - """Get the value of an attribute of the particle. + """ + Get the value of an attribute of the particle. + + Parameters + ---------- + name : str + Name of the requested particle attribute. - :param name: Name of the requested particle attribute. - :return: The value of the particle attribute in the underlying - collection data array. + Returns + ------- + any + The value of the particle attribute in the underlying collection data array. """ if name in BaseParticleAccessor.__dict__.keys(): - result = super(ParticleAccessorSOA, self).__getattr__(name) + result = super().__getattr__(name) elif name in type(self).__dict__.keys(): result = object.__getattribute__(self, name) else: @@ -888,14 +969,18 @@ def __getattr__(self, name): return result def __setattr__(self, name, value): - """Set the value of an attribute of the particle. + """ + Set the value of an attribute of the particle. - :param name: Name of the particle attribute. - :param value: Value that will be assigned to the particle - attribute in the underlying collection data array. + Parameters + ---------- + name : str + Name of the particle attribute. + value : any + Value that will be assigned to the particle attribute in the underlying collection data array. """ if name in BaseParticleAccessor.__dict__.keys(): - super(ParticleAccessorSOA, self).__setattr__(name, value) + super().__setattr__(name, value) elif name in type(self).__dict__.keys(): object.__setattr__(self, name, value) else: @@ -913,18 +998,18 @@ def update_next_dt(self, next_dt=None): self._next_dt = next_dt def __repr__(self): - time_string = 'not_yet_set' if self.time is None or np.isnan(self.time) else "{:f}".format(self.time) + time_string = 'not_yet_set' if self.time is None or np.isnan(self.time) else f"{self.time:f}" str = "P[%d](lon=%f, lat=%f, depth=%f, " % (self.id, self.lon, self.lat, self.depth) for var in self._pcoll.ptype.variables: if var.to_write is not False and var.name not in ['id', 'lon', 'lat', 'depth', 'time']: - str += "%s=%f, " % (var.name, getattr(self, var.name)) - return str + "time=%s)" % time_string + str += f"{var.name}={getattr(self, var.name):f}, " + return str + f"time={time_string})" class ParticleCollectionIterableSOA(BaseParticleCollectionIterable): def __init__(self, pcoll, reverse=False, subset=None): - super(ParticleCollectionIterableSOA, self).__init__(pcoll, reverse, subset) + super().__init__(pcoll, reverse, subset) def __iter__(self): return ParticleCollectionIteratorSOA(pcoll=self._pcoll_immutable, reverse=self._reverse, subset=self._subset) @@ -941,12 +1026,17 @@ def __getitem__(self, items): class ParticleCollectionIteratorSOA(BaseParticleCollectionIterator): """Iterator for looping over the particles in the ParticleCollection. - :param pcoll: ParticleCollection that stores the particles. - :param reverse: Flag to indicate reverse iteration (i.e. starting at - the largest index, instead of the smallest). - :param subset: Subset of indices to iterate over, this allows the - creation of an iterator that represents part of the - collection. + Parameters + ---------- + pcoll : + ParticleCollection that stores the particles. + reverse : + Flag to indicate reverse iteration (i.e. starting at + the largest index, instead of the smallest). + subset : + Subset of indices to iterate over, this allows the + creation of an iterator that represents part of the + collection. """ def __init__(self, pcoll, reverse=False, subset=None): @@ -997,4 +1087,4 @@ def current(self): def __repr__(self): dir_str = 'Backward' if self._reverse else 'Forward' - return "%s iteration at index %s of %s." % (dir_str, self._index, self.max_len) + return f"{dir_str} iteration at index {self._index} of {self.max_len}." From 3d0077a6e77968b390f32ad319bf6786d348df14 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 21 Aug 2023 15:34:18 +0000 Subject: [PATCH 02/22] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- parcels/collection/collectionsoa.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/parcels/collection/collectionsoa.py b/parcels/collection/collectionsoa.py index 202a800c4..153b19eee 100644 --- a/parcels/collection/collectionsoa.py +++ b/parcels/collection/collectionsoa.py @@ -59,12 +59,12 @@ def partitionParticles4MPI_default(coords,mpi_size=1): if KMeans: kmeans = KMeans(n_clusters=mpi_size, random_state=0).fit(coords) - mpiProcs = kmeans.labels_ + mpiProcs = kmeans.labels_ else: # assigning random labels if no KMeans (see https://github.com/OceanParcels/parcels/issues/1261) logger.warning_once('sklearn needs to be available if MPI is installed. ' 'See http://oceanparcels.org/#parallel_install for more information') mpiProcs = np.randint(0, mpi_size, size=len(lon)) - + #print('Using default KMeans partitioning of particles to MPI processes',flush=True) return mpiProcs @@ -73,7 +73,7 @@ def partitionParticles4MPI_default(coords,mpi_size=1): partitionParticles4MPI=partitionParticles4MPI_default #This function, if called before the particle set is created, will alter how the -#particle set is partitioned between MPI jobs. +#particle set is partitioned between MPI jobs. def setPartitionFunction(partitionFunction): global partitionParticles4MPI partitionParticles4MPI=partitionFunction @@ -81,7 +81,7 @@ def setPartitionFunction(partitionFunction): #============================================================================= - + class ParticleCollectionSOA(ParticleCollection): def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, partitions=None, ngrid=1, **kwargs): From 68262feba6bd7a6d1926ea406a1474e309205836 Mon Sep 17 00:00:00 2001 From: JamiePringle Date: Mon, 21 Aug 2023 14:18:58 -0400 Subject: [PATCH 03/22] Update parcels/collection/collectionsoa.py Co-authored-by: Erik van Sebille --- parcels/collection/collectionsoa.py | 1 - 1 file changed, 1 deletion(-) diff --git a/parcels/collection/collectionsoa.py b/parcels/collection/collectionsoa.py index 153b19eee..ecd5935b0 100644 --- a/parcels/collection/collectionsoa.py +++ b/parcels/collection/collectionsoa.py @@ -134,7 +134,6 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p # the call to setPartiionParticles4MPI() # before the particle set is created. self._pu_indicators = partitionParticles4MPI(coords,mpi_size=mpi_size) - #print('In rank 0, _pu_indicators is',self._pu_indicators,flush=True) else: self._pu_indicators = None self._pu_indicators = mpi_comm.bcast(self._pu_indicators, root=0) From 2f368f5f6803998202e4a937f0280e5a15b8ba3a Mon Sep 17 00:00:00 2001 From: JamiePringle Date: Mon, 21 Aug 2023 14:19:30 -0400 Subject: [PATCH 04/22] Update parcels/collection/collectionsoa.py Co-authored-by: Erik van Sebille --- parcels/collection/collectionsoa.py | 1 - 1 file changed, 1 deletion(-) diff --git a/parcels/collection/collectionsoa.py b/parcels/collection/collectionsoa.py index ecd5935b0..c3cc411b0 100644 --- a/parcels/collection/collectionsoa.py +++ b/parcels/collection/collectionsoa.py @@ -65,7 +65,6 @@ def partitionParticles4MPI_default(coords,mpi_size=1): 'See http://oceanparcels.org/#parallel_install for more information') mpiProcs = np.randint(0, mpi_size, size=len(lon)) - #print('Using default KMeans partitioning of particles to MPI processes',flush=True) return mpiProcs From 71bfbd712bd1967bee04de0042a5f9cc85c328ab Mon Sep 17 00:00:00 2001 From: JamiePringle Date: Wed, 23 Aug 2023 15:58:56 -0400 Subject: [PATCH 05/22] Update parcels/collection/collectionsoa.py Co-authored-by: Erik van Sebille --- parcels/collection/collectionsoa.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parcels/collection/collectionsoa.py b/parcels/collection/collectionsoa.py index c3cc411b0..b67413cbc 100644 --- a/parcels/collection/collectionsoa.py +++ b/parcels/collection/collectionsoa.py @@ -132,7 +132,7 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p # which particles. It can be specified in # the call to setPartiionParticles4MPI() # before the particle set is created. - self._pu_indicators = partitionParticles4MPI(coords,mpi_size=mpi_size) + self._pu_indicators = partitionfunction(coords,mpi_size=mpi_size) else: self._pu_indicators = None self._pu_indicators = mpi_comm.bcast(self._pu_indicators, root=0) From fbb2e973e067e81c96b2f969cc75b8766c25456f Mon Sep 17 00:00:00 2001 From: JamiePringle Date: Wed, 23 Aug 2023 15:59:20 -0400 Subject: [PATCH 06/22] Update parcels/collection/collectionsoa.py Co-authored-by: Erik van Sebille --- parcels/collection/collectionsoa.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parcels/collection/collectionsoa.py b/parcels/collection/collectionsoa.py index b67413cbc..51c5c615c 100644 --- a/parcels/collection/collectionsoa.py +++ b/parcels/collection/collectionsoa.py @@ -27,7 +27,7 @@ KMeans = None #============================================================================= -# The function partitionParticles4MPI_default() is the default scheme +# The function partitionParticlesMPI_default() is the default scheme # to partition the different particles into different MPI # processes. The function setPartitionFunction() will, if called # before the particle set is created, allow the user to specify their From 17469b3bb6f8cc4feb412fbbdca68755835a7ba8 Mon Sep 17 00:00:00 2001 From: JamiePringle Date: Wed, 23 Aug 2023 17:16:19 -0400 Subject: [PATCH 07/22] define partitionFunction at particleSet creation With these changes, any particleSet creation function should take a kwarg of partitionFunction which defines the partitioning of particles to different MPI jobs/ranks --- parcels/collection/collectionsoa.py | 19 ++++++++++++------- parcels/particleset/particlesetsoa.py | 10 ++++++++++ 2 files changed, 22 insertions(+), 7 deletions(-) diff --git a/parcels/collection/collectionsoa.py b/parcels/collection/collectionsoa.py index 51c5c615c..06995dcd5 100644 --- a/parcels/collection/collectionsoa.py +++ b/parcels/collection/collectionsoa.py @@ -34,10 +34,10 @@ # own scheme for partitioning the particles to different MPI jobs by # passing a new partitioning function to setPartitionFunction(). The # arguements of this new function must match those of -# partitionParticles4MPI_default(), as described in the comments to +# partitionParticlesMPI_default(), as described in the comments to # that function. -def partitionParticles4MPI_default(coords,mpi_size=1): +def partitionParticlesMPI_default(coords,mpi_size=1): '''This function takes the coordinates of the particle starting positions and returns which MPI process will process each particle. @@ -69,13 +69,18 @@ def partitionParticles4MPI_default(coords,mpi_size=1): return mpiProcs #by default, the partition is done by the functiond defined above -partitionParticles4MPI=partitionParticles4MPI_default +#the line beflow does not actually do anything except initialize the +#global to this module variable partitionParticlesMPI. The function +#setPartitionFunction() should be called as part of the particleset +#creation in particlesetsoa.py. However, the value assigned below +#will work if we somehow skip that step. +partitionParticlesMPI=partitionParticlesMPI_default #This function, if called before the particle set is created, will alter how the #particle set is partitioned between MPI jobs. def setPartitionFunction(partitionFunction): - global partitionParticles4MPI - partitionParticles4MPI=partitionFunction + global partitionParticlesMPI + partitionParticlesMPI=partitionFunction return None #============================================================================= @@ -130,9 +135,9 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p # partitionParticles is a function which # decides which MPI jobs will process # which particles. It can be specified in - # the call to setPartiionParticles4MPI() + # the call to setPartiionParticlesMPI() # before the particle set is created. - self._pu_indicators = partitionfunction(coords,mpi_size=mpi_size) + self._pu_indicators = partitionParticlesMPI(coords,mpi_size=mpi_size) else: self._pu_indicators = None self._pu_indicators = mpi_comm.bcast(self._pu_indicators, root=0) diff --git a/parcels/particleset/particlesetsoa.py b/parcels/particleset/particlesetsoa.py index aca1e6434..9e9bc6953 100644 --- a/parcels/particleset/particlesetsoa.py +++ b/parcels/particleset/particlesetsoa.py @@ -8,6 +8,7 @@ from parcels.collection.collectionsoa import ParticleCollectionIterableSOA # noqa from parcels.collection.collectionsoa import ParticleCollectionIteratorSOA # noqa +from parcels.collection.collectionsoa import partitionParticlesMPI_default,setPartitionFunction from parcels.collection.collectionsoa import ParticleCollectionSOA from parcels.grid import CurvilinearGrid, GridCode from parcels.interaction.interactionkernelsoa import InteractionKernelSOA @@ -134,6 +135,15 @@ def ArrayClass_init(self, *args, **kwargs): self.fieldset.check_complete() partitions = kwargs.pop('partitions', None) + #cjmp + #if a partioning function for MPI runs has been passed into the + #particle creation with the "partitionFunction" kwarg, retrieve it here. + #if it has not, assign the default function, partitionParticlesMPI_defualt(), + #which is defined in collectionsoa.py + partitionFunction=kwargs.pop('partitionFunction',partitionParticlesMPI_default) + setPartitionFunction(partitionFunction) + #cjmp end + lon = np.empty(shape=0) if lon is None else convert_to_flat_array(lon) lat = np.empty(shape=0) if lat is None else convert_to_flat_array(lat) From 554713cb739e1906c603d339fe8c754734443afb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 23 Aug 2023 21:18:40 +0000 Subject: [PATCH 08/22] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- parcels/collection/collectionsoa.py | 2 +- parcels/particleset/particlesetsoa.py | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/parcels/collection/collectionsoa.py b/parcels/collection/collectionsoa.py index 06995dcd5..80a1538b1 100644 --- a/parcels/collection/collectionsoa.py +++ b/parcels/collection/collectionsoa.py @@ -73,7 +73,7 @@ def partitionParticlesMPI_default(coords,mpi_size=1): #global to this module variable partitionParticlesMPI. The function #setPartitionFunction() should be called as part of the particleset #creation in particlesetsoa.py. However, the value assigned below -#will work if we somehow skip that step. +#will work if we somehow skip that step. partitionParticlesMPI=partitionParticlesMPI_default #This function, if called before the particle set is created, will alter how the diff --git a/parcels/particleset/particlesetsoa.py b/parcels/particleset/particlesetsoa.py index 9e9bc6953..9a2f90a09 100644 --- a/parcels/particleset/particlesetsoa.py +++ b/parcels/particleset/particlesetsoa.py @@ -8,8 +8,11 @@ from parcels.collection.collectionsoa import ParticleCollectionIterableSOA # noqa from parcels.collection.collectionsoa import ParticleCollectionIteratorSOA # noqa -from parcels.collection.collectionsoa import partitionParticlesMPI_default,setPartitionFunction -from parcels.collection.collectionsoa import ParticleCollectionSOA +from parcels.collection.collectionsoa import ( + ParticleCollectionSOA, + partitionParticlesMPI_default, + setPartitionFunction, +) from parcels.grid import CurvilinearGrid, GridCode from parcels.interaction.interactionkernelsoa import InteractionKernelSOA from parcels.interaction.neighborsearch import ( From 2ca72ea49fe85e24ae8ec21dc000188c4f6b826f Mon Sep 17 00:00:00 2001 From: JamiePringle Date: Wed, 23 Aug 2023 18:36:48 -0400 Subject: [PATCH 09/22] Update particle set to specify partitionFunction Update particle set to specify partitionFunction --- parcels/particleset/particlesetsoa.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/parcels/particleset/particlesetsoa.py b/parcels/particleset/particlesetsoa.py index 9a2f90a09..9ac7f35da 100644 --- a/parcels/particleset/particlesetsoa.py +++ b/parcels/particleset/particlesetsoa.py @@ -138,14 +138,12 @@ def ArrayClass_init(self, *args, **kwargs): self.fieldset.check_complete() partitions = kwargs.pop('partitions', None) - #cjmp #if a partioning function for MPI runs has been passed into the #particle creation with the "partitionFunction" kwarg, retrieve it here. #if it has not, assign the default function, partitionParticlesMPI_defualt(), #which is defined in collectionsoa.py partitionFunction=kwargs.pop('partitionFunction',partitionParticlesMPI_default) setPartitionFunction(partitionFunction) - #cjmp end lon = np.empty(shape=0) if lon is None else convert_to_flat_array(lon) lat = np.empty(shape=0) if lat is None else convert_to_flat_array(lat) From e610cd74e7bcf61c2870ebb35517862385f1ac75 Mon Sep 17 00:00:00 2001 From: Erik van Sebille Date: Thu, 24 Aug 2023 11:04:50 +0200 Subject: [PATCH 10/22] Cleaning up partition_function code --- parcels/collection/collectionsoa.py | 46 ++++++--------------------- parcels/particleset/particlesetsoa.py | 15 ++------- 2 files changed, 12 insertions(+), 49 deletions(-) diff --git a/parcels/collection/collectionsoa.py b/parcels/collection/collectionsoa.py index 80a1538b1..80bafcfb3 100644 --- a/parcels/collection/collectionsoa.py +++ b/parcels/collection/collectionsoa.py @@ -26,18 +26,8 @@ except: KMeans = None -#============================================================================= -# The function partitionParticlesMPI_default() is the default scheme -# to partition the different particles into different MPI -# processes. The function setPartitionFunction() will, if called -# before the particle set is created, allow the user to specify their -# own scheme for partitioning the particles to different MPI jobs by -# passing a new partitioning function to setPartitionFunction(). The -# arguements of this new function must match those of -# partitionParticlesMPI_default(), as described in the comments to -# that function. - -def partitionParticlesMPI_default(coords,mpi_size=1): + +def partitionParticlesMPI_default(coords, mpi_size=1): '''This function takes the coordinates of the particle starting positions and returns which MPI process will process each particle. @@ -63,28 +53,10 @@ def partitionParticlesMPI_default(coords,mpi_size=1): else: # assigning random labels if no KMeans (see https://github.com/OceanParcels/parcels/issues/1261) logger.warning_once('sklearn needs to be available if MPI is installed. ' 'See http://oceanparcels.org/#parallel_install for more information') - mpiProcs = np.randint(0, mpi_size, size=len(lon)) - + mpiProcs = np.randint(0, mpi_size, size=coords.shape[0]) return mpiProcs -#by default, the partition is done by the functiond defined above -#the line beflow does not actually do anything except initialize the -#global to this module variable partitionParticlesMPI. The function -#setPartitionFunction() should be called as part of the particleset -#creation in particlesetsoa.py. However, the value assigned below -#will work if we somehow skip that step. -partitionParticlesMPI=partitionParticlesMPI_default - -#This function, if called before the particle set is created, will alter how the -#particle set is partitioned between MPI jobs. -def setPartitionFunction(partitionFunction): - global partitionParticlesMPI - partitionParticlesMPI=partitionFunction - return None -#============================================================================= - - class ParticleCollectionSOA(ParticleCollection): @@ -124,6 +96,11 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p mpi_rank = mpi_comm.Get_rank() mpi_size = mpi_comm.Get_size() + # If a partitioning function for MPI runs has been passed into the + # particle creation with the "partition_function" kwarg, retrieve it here. + # If it has not, assign the default function, partitionParticlesMPI_defualt() + partition_function = kwargs.pop('partition_function', partitionParticlesMPI_default) + if lon.size < mpi_size and mpi_size > 1: raise RuntimeError('Cannot initialise with fewer particles than MPI processors') @@ -132,12 +109,7 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p if (self._pu_indicators is None) or (len(self._pu_indicators) != len(lon)): if mpi_rank == 0: coords = np.vstack((lon, lat)).transpose() - # partitionParticles is a function which - # decides which MPI jobs will process - # which particles. It can be specified in - # the call to setPartiionParticlesMPI() - # before the particle set is created. - self._pu_indicators = partitionParticlesMPI(coords,mpi_size=mpi_size) + self._pu_indicators = partition_function(coords, mpi_size=mpi_size) else: self._pu_indicators = None self._pu_indicators = mpi_comm.bcast(self._pu_indicators, root=0) diff --git a/parcels/particleset/particlesetsoa.py b/parcels/particleset/particlesetsoa.py index 9ac7f35da..5c03bfc02 100644 --- a/parcels/particleset/particlesetsoa.py +++ b/parcels/particleset/particlesetsoa.py @@ -8,11 +8,7 @@ from parcels.collection.collectionsoa import ParticleCollectionIterableSOA # noqa from parcels.collection.collectionsoa import ParticleCollectionIteratorSOA # noqa -from parcels.collection.collectionsoa import ( - ParticleCollectionSOA, - partitionParticlesMPI_default, - setPartitionFunction, -) +from parcels.collection.collectionsoa import ParticleCollectionSOA from parcels.grid import CurvilinearGrid, GridCode from parcels.interaction.interactionkernelsoa import InteractionKernelSOA from parcels.interaction.neighborsearch import ( @@ -87,6 +83,8 @@ class ParticleSetSOA(BaseParticleSet): periodic_domain_zonal : Zonal domain size, used to apply zonally periodic boundaries for particle-particle interaction. If None, no zonally periodic boundaries are applied + partition_function : + Function to use for partitioning particles over processors. Default is to use kMeans Other Variables can be initialised using further arguments (e.g. v=... for a Variable named 'v') """ @@ -138,13 +136,6 @@ def ArrayClass_init(self, *args, **kwargs): self.fieldset.check_complete() partitions = kwargs.pop('partitions', None) - #if a partioning function for MPI runs has been passed into the - #particle creation with the "partitionFunction" kwarg, retrieve it here. - #if it has not, assign the default function, partitionParticlesMPI_defualt(), - #which is defined in collectionsoa.py - partitionFunction=kwargs.pop('partitionFunction',partitionParticlesMPI_default) - setPartitionFunction(partitionFunction) - lon = np.empty(shape=0) if lon is None else convert_to_flat_array(lon) lat = np.empty(shape=0) if lat is None else convert_to_flat_array(lat) From 6fc55547c0795400def3a8f3ab866df9b0c1587d Mon Sep 17 00:00:00 2001 From: Erik van Sebille Date: Thu, 24 Aug 2023 11:07:48 +0200 Subject: [PATCH 11/22] Fixing pre-commit/pydocstyle issues --- parcels/collection/collectionsoa.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/parcels/collection/collectionsoa.py b/parcels/collection/collectionsoa.py index 80bafcfb3..141eabf8a 100644 --- a/parcels/collection/collectionsoa.py +++ b/parcels/collection/collectionsoa.py @@ -28,7 +28,7 @@ def partitionParticlesMPI_default(coords, mpi_size=1): - '''This function takes the coordinates of the particle starting + """This function takes the coordinates of the particle starting positions and returns which MPI process will process each particle. @@ -44,8 +44,7 @@ def partitionParticlesMPI_default(coords, mpi_size=1): mpiProcs: an integer array with values from 0 to mpi_size-1 specifying which MPI job will run which particles. len(mpiProcs) must equal coords.shape[0] - - ''' + """ if KMeans: kmeans = KMeans(n_clusters=mpi_size, random_state=0).fit(coords) From 4926595396052788873e8c0c54d67447cc5959ac Mon Sep 17 00:00:00 2001 From: Erik van Sebille Date: Thu, 24 Aug 2023 11:13:56 +0200 Subject: [PATCH 12/22] Fixing another pydocstyle issue... --- parcels/collection/collectionsoa.py | 1 - 1 file changed, 1 deletion(-) diff --git a/parcels/collection/collectionsoa.py b/parcels/collection/collectionsoa.py index 141eabf8a..75bccbdcb 100644 --- a/parcels/collection/collectionsoa.py +++ b/parcels/collection/collectionsoa.py @@ -45,7 +45,6 @@ def partitionParticlesMPI_default(coords, mpi_size=1): specifying which MPI job will run which particles. len(mpiProcs) must equal coords.shape[0] """ - if KMeans: kmeans = KMeans(n_clusters=mpi_size, random_state=0).fit(coords) mpiProcs = kmeans.labels_ From 4269dbd34558dd6b50aa930c3bb6c71e2cd07b81 Mon Sep 17 00:00:00 2001 From: Erik van Sebille Date: Thu, 24 Aug 2023 18:03:19 +0200 Subject: [PATCH 13/22] Cleaning up partition_function for MPI --- parcels/collection/collectionsoa.py | 17 ++++++----------- parcels/particleset/baseparticleset.py | 6 +++--- parcels/particleset/particlesetsoa.py | 21 ++++++++++----------- 3 files changed, 19 insertions(+), 25 deletions(-) diff --git a/parcels/collection/collectionsoa.py b/parcels/collection/collectionsoa.py index 75bccbdcb..27fb4ca54 100644 --- a/parcels/collection/collectionsoa.py +++ b/parcels/collection/collectionsoa.py @@ -12,7 +12,6 @@ ) from parcels.field import Field from parcels.particle import JITParticle, ScipyParticle # noqa -from parcels.tools.converters import convert_to_flat_array from parcels.tools.loggers import logger from parcels.tools.statuscodes import NotTestedError @@ -58,7 +57,7 @@ def partitionParticlesMPI_default(coords, mpi_size=1): class ParticleCollectionSOA(ParticleCollection): - def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, partitions=None, ngrid=1, **kwargs): + def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, ngrid=1, **kwargs): """ Parameters ---------- @@ -80,9 +79,10 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p assert lon.size == time.size, ( 'time and positions (lon, lat, depth) don''t have the same lengths.') - # If partitions is false, the partitions are already initialised - if partitions is not None and partitions is not False: - self._pu_indicators = convert_to_flat_array(partitions) + # If a partitioning function for MPI runs has been passed into the + # particle creation with the "partition_function" kwarg, retrieve it here. + # If it has not, assign the default function, partitionParticlesMPI_defualt() + partition_function = kwargs.pop('partition_function', partitionParticlesMPI_default) for kwvar in kwargs: assert lon.size == kwargs[kwvar].size, ( @@ -94,16 +94,11 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p mpi_rank = mpi_comm.Get_rank() mpi_size = mpi_comm.Get_size() - # If a partitioning function for MPI runs has been passed into the - # particle creation with the "partition_function" kwarg, retrieve it here. - # If it has not, assign the default function, partitionParticlesMPI_defualt() - partition_function = kwargs.pop('partition_function', partitionParticlesMPI_default) - if lon.size < mpi_size and mpi_size > 1: raise RuntimeError('Cannot initialise with fewer particles than MPI processors') if mpi_size > 1: - if partitions is not False: + if partition_function is not False: if (self._pu_indicators is None) or (len(self._pu_indicators) != len(lon)): if mpi_rank == 0: coords = np.vstack((lon, lat)).transpose() diff --git a/parcels/particleset/baseparticleset.py b/parcels/particleset/baseparticleset.py index cc9def08a..6ac3e9610 100644 --- a/parcels/particleset/baseparticleset.py +++ b/parcels/particleset/baseparticleset.py @@ -141,7 +141,7 @@ def from_list(cls, fieldset, pclass, lon, lat, depth=None, time=None, repeatdt=N return cls(fieldset=fieldset, pclass=pclass, lon=lon, lat=lat, depth=depth, time=time, repeatdt=repeatdt, lonlatdepth_dtype=lonlatdepth_dtype, **kwargs) @classmethod - def from_line(cls, fieldset, pclass, start, finish, size, depth=None, time=None, repeatdt=None, lonlatdepth_dtype=None): + def from_line(cls, fieldset, pclass, start, finish, size, depth=None, time=None, repeatdt=None, lonlatdepth_dtype=None, **kwargs): """Create a particleset in the shape of a line (according to a cartesian grid). Initialise the ParticleSet from start/finish coordinates with equidistant spacing @@ -175,7 +175,7 @@ def from_line(cls, fieldset, pclass, start, finish, size, depth=None, time=None, lat = np.linspace(start[1], finish[1], size) if type(depth) in [int, float]: depth = [depth] * size - return cls(fieldset=fieldset, pclass=pclass, lon=lon, lat=lat, depth=depth, time=time, repeatdt=repeatdt, lonlatdepth_dtype=lonlatdepth_dtype) + return cls(fieldset=fieldset, pclass=pclass, lon=lon, lat=lat, depth=depth, time=time, repeatdt=repeatdt, lonlatdepth_dtype=lonlatdepth_dtype, **kwargs) @classmethod @abstractmethod @@ -557,7 +557,7 @@ def execute(self, pyfunc=AdvectionRK4, pyfunc_inter=None, endtime=None, runtime= lat=self.repeatlat, depth=self.repeatdepth, pclass=self.repeatpclass, lonlatdepth_dtype=self.collection.lonlatdepth_dtype, - partitions=False, pid_orig=self.repeatpid, **self.repeatkwargs) + partition_function=False, pid_orig=self.repeatpid, **self.repeatkwargs) for p in pset_new: p.dt = dt self.add(pset_new) diff --git a/parcels/particleset/particlesetsoa.py b/parcels/particleset/particlesetsoa.py index 5c03bfc02..50642a5ba 100644 --- a/parcels/particleset/particlesetsoa.py +++ b/parcels/particleset/particlesetsoa.py @@ -77,14 +77,11 @@ class ParticleSetSOA(BaseParticleSet): and np.float64 if the interpolation method is 'cgrid_velocity' pid_orig : Optional list of (offsets for) the particle IDs - partitions : - List of cores on which to distribute the particles for MPI runs. Default: None, in which case particles - are distributed automatically on the processors + partition_function : + Function to use for partitioning particles over processors. Default is to use kMeans periodic_domain_zonal : Zonal domain size, used to apply zonally periodic boundaries for particle-particle interaction. If None, no zonally periodic boundaries are applied - partition_function : - Function to use for partitioning particles over processors. Default is to use kMeans Other Variables can be initialised using further arguments (e.g. v=... for a Variable named 'v') """ @@ -134,7 +131,6 @@ def ArrayClass_init(self, *args, **kwargs): logger.warning_once("No FieldSet provided in ParticleSet generation. This breaks most Parcels functionality") else: self.fieldset.check_complete() - partitions = kwargs.pop('partitions', None) lon = np.empty(shape=0) if lon is None else convert_to_flat_array(lon) lat = np.empty(shape=0) if lat is None else convert_to_flat_array(lat) @@ -171,9 +167,10 @@ def ArrayClass_init(self, *args, **kwargs): 'lon lat depth precision should be set to either np.float32 or np.float64' for kwvar in kwargs: - kwargs[kwvar] = convert_to_flat_array(kwargs[kwvar]) - assert lon.size == kwargs[kwvar].size, ( - f"{kwvar} and positions (lon, lat, depth) don't have the same lengths.") + if kwvar not in ['partition_function']: + kwargs[kwvar] = convert_to_flat_array(kwargs[kwvar]) + assert lon.size == kwargs[kwvar].size, ( + f"{kwvar} and positions (lon, lat, depth) don't have the same lengths.") self.repeatdt = repeatdt.total_seconds() if isinstance(repeatdt, delta) else repeatdt if self.repeatdt: @@ -183,6 +180,7 @@ def ArrayClass_init(self, *args, **kwargs): raise 'All Particle.time should be the same when repeatdt is not None' self.repeatpclass = pclass self.repeatkwargs = kwargs + self.repeatkwargs.pop('partition_function', None) ngrids = fieldset.gridset.size if fieldset is not None else 1 @@ -200,7 +198,7 @@ def ArrayClass_init(self, *args, **kwargs): self._collection = ParticleCollectionSOA( _pclass, lon=lon, lat=lat, depth=depth, time=time, lonlatdepth_dtype=lonlatdepth_dtype, pid_orig=pid_orig, - partitions=partitions, ngrid=ngrids, **kwargs) + ngrid=ngrids, **kwargs) # Initialize neighbor search data structure (used for interaction). if interaction_distance is not None: @@ -247,7 +245,8 @@ def ArrayClass_init(self, *args, **kwargs): self.repeatlat = copy(self._collection.data['lat']) self.repeatdepth = copy(self._collection.data['depth']) for kwvar in kwargs: - self.repeatkwargs[kwvar] = copy(self._collection.data[kwvar]) + if kwvar not in ['partition_function']: + self.repeatkwargs[kwvar] = copy(self._collection.data[kwvar]) if self.repeatdt: if MPI and self._collection.pu_indicators is not None: From fa94df694619d06e0ff62e2e4e8114948fdb4ff8 Mon Sep 17 00:00:00 2001 From: Erik van Sebille Date: Thu, 24 Aug 2023 18:03:32 +0200 Subject: [PATCH 14/22] Adding unit test for partition_function --- docs/examples/example_stommel.py | 22 ++++++++++++++++++---- tests/test_mpirun.py | 29 ++++++++++++++++------------- 2 files changed, 34 insertions(+), 17 deletions(-) diff --git a/docs/examples/example_stommel.py b/docs/examples/example_stommel.py index 7e95c5525..04b6b6fac 100644 --- a/docs/examples/example_stommel.py +++ b/docs/examples/example_stommel.py @@ -86,8 +86,14 @@ def AgeP(particle, fieldset, time): particle.delete() +def simple_partition_function(coords, mpi_size=1): + """A very simple partition function that assigns particles to processors (for MPI testing purposes))""" + return np.array([int(i) for i in np.linspace(0, mpi_size, coords.shape[0], endpoint=False)]) + + def stommel_example(npart=1, mode='jit', verbose=False, method=AdvectionRK4, grid_type='A', - outfile="StommelParticle.zarr", repeatdt=None, maxage=None, write_fields=True, pset_mode='soa'): + outfile="StommelParticle.zarr", repeatdt=None, maxage=None, write_fields=True, + custom_partition_function=False, pset_mode='soa'): timer.fieldset = timer.Timer('FieldSet', parent=timer.stommel) fieldset = stommel_fieldset(grid_type=grid_type) if write_fields: @@ -105,8 +111,13 @@ class MyParticle(ParticleClass): p_start = Variable('p_start', dtype=np.float32, initial=fieldset.P) age = Variable('age', dtype=np.float32, initial=0.) - pset = pset_type[pset_mode]['pset'].from_line(fieldset, size=npart, pclass=MyParticle, repeatdt=repeatdt, - start=(10e3, 5000e3), finish=(100e3, 5000e3), time=0) + if custom_partition_function: + pset = pset_type[pset_mode]['pset'].from_line(fieldset, size=npart, pclass=MyParticle, repeatdt=repeatdt, + start=(10e3, 5000e3), finish=(100e3, 5000e3), time=0, + partition_function=simple_partition_function) + else: + pset = pset_type[pset_mode]['pset'].from_line(fieldset, size=npart, pclass=MyParticle, repeatdt=repeatdt, + start=(10e3, 5000e3), finish=(100e3, 5000e3), time=0) if verbose: print(f"Initial particle positions:\n{pset}") @@ -174,12 +185,15 @@ def main(args=None): help='max age of the particles (after which particles are deleted)') p.add_argument('-wf', '--write_fields', default=True, help='Write the hydrodynamic fields to NetCDF') + p.add_argument('-cpf', '--custom_partition_function', default=False, + help='Use a custom partition_function (for MPI testing purposes)') args = p.parse_args(args) timer.args.stop() timer.stommel = timer.Timer('Stommel', parent=timer.root) stommel_example(args.particles, mode=args.mode, verbose=args.verbose, method=method[args.method], - outfile=args.outfile, repeatdt=args.repeatdt, maxage=args.maxage, write_fields=args.write_fields) + outfile=args.outfile, repeatdt=args.repeatdt, maxage=args.maxage, write_fields=args.write_fields, + custom_partition_function=args.custom_partition_function) timer.stommel.stop() timer.root.stop() timer.root.print_tree() diff --git a/tests/test_mpirun.py b/tests/test_mpirun.py index 4483dfdd1..1ed330935 100644 --- a/tests/test_mpirun.py +++ b/tests/test_mpirun.py @@ -13,7 +13,7 @@ @pytest.mark.skipif(sys.platform.startswith("darwin"), reason="skipping macOS test as problem with file in pytest") -@pytest.mark.parametrize('pset_mode', ['soa', 'aos']) +@pytest.mark.parametrize('pset_mode', ['soa']) @pytest.mark.parametrize('repeatdt, maxage', [(20*86400, 600*86400), (10*86400, 10*86400)]) @pytest.mark.parametrize('nump', [4, 8]) def test_mpi_run(pset_mode, tmpdir, repeatdt, maxage, nump): @@ -21,25 +21,28 @@ def test_mpi_run(pset_mode, tmpdir, repeatdt, maxage, nump): stommel_file = path.join(path.dirname(__file__), '..', 'docs', 'examples', 'example_stommel.py') outputMPI = tmpdir.join('StommelMPI') + outputMPI_partition_function = tmpdir.join('StommelMPI_partition_function') outputNoMPI = tmpdir.join('StommelNoMPI.zarr') + system('mpirun -np 2 python %s -p %d -o %s -r %d -a %d -psm %s -wf False -cpf True' % (stommel_file, nump, outputMPI_partition_function, repeatdt, maxage, pset_mode)) system('mpirun -np 2 python %s -p %d -o %s -r %d -a %d -psm %s -wf False' % (stommel_file, nump, outputMPI, repeatdt, maxage, pset_mode)) system('python %s -p %d -o %s -r %d -a %d -psm %s -wf False' % (stommel_file, nump, outputNoMPI, repeatdt, maxage, pset_mode)) - files = glob(path.join(outputMPI, "proc*")) - ds1 = xr.concat([xr.open_zarr(f) for f in files], dim='trajectory', - compat='no_conflicts', coords='minimal').sortby(['trajectory']) - ds2 = xr.open_zarr(outputNoMPI) - for v in ds2.variables.keys(): - if v == 'time': - continue # skip because np.allclose does not work well on np.datetime64 - assert np.allclose(ds1.variables[v][:], ds2.variables[v][:], equal_nan=True) + for mpi_run in [outputMPI, outputMPI_partition_function]: + files = glob(path.join(mpi_run, "proc*")) + ds1 = xr.concat([xr.open_zarr(f) for f in files], dim='trajectory', + compat='no_conflicts', coords='minimal').sortby(['trajectory']) + + for v in ds2.variables.keys(): + if v == 'time': + continue # skip because np.allclose does not work well on np.datetime64 + assert np.allclose(ds1.variables[v][:], ds2.variables[v][:], equal_nan=True) - for a in ds2.attrs: - if a != 'parcels_version': - assert ds1.attrs[a] == ds2.attrs[a] + for a in ds2.attrs: + if a != 'parcels_version': + assert ds1.attrs[a] == ds2.attrs[a] - ds1.close() + ds1.close() ds2.close() From a21c9b6d915e8633b54ef5a47b76418569212013 Mon Sep 17 00:00:00 2001 From: Erik van Sebille Date: Thu, 24 Aug 2023 18:16:18 +0200 Subject: [PATCH 15/22] Fixing flake8 errors --- docs/examples/example_stommel.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/examples/example_stommel.py b/docs/examples/example_stommel.py index 04b6b6fac..2707aca69 100644 --- a/docs/examples/example_stommel.py +++ b/docs/examples/example_stommel.py @@ -113,11 +113,11 @@ class MyParticle(ParticleClass): if custom_partition_function: pset = pset_type[pset_mode]['pset'].from_line(fieldset, size=npart, pclass=MyParticle, repeatdt=repeatdt, - start=(10e3, 5000e3), finish=(100e3, 5000e3), time=0, - partition_function=simple_partition_function) + start=(10e3, 5000e3), finish=(100e3, 5000e3), time=0, + partition_function=simple_partition_function) else: pset = pset_type[pset_mode]['pset'].from_line(fieldset, size=npart, pclass=MyParticle, repeatdt=repeatdt, - start=(10e3, 5000e3), finish=(100e3, 5000e3), time=0) + start=(10e3, 5000e3), finish=(100e3, 5000e3), time=0) if verbose: print(f"Initial particle positions:\n{pset}") From e148126c039dcb2fa6f2bb2f0dc59cc2c74ad94e Mon Sep 17 00:00:00 2001 From: Erik van Sebille Date: Thu, 24 Aug 2023 19:50:48 +0200 Subject: [PATCH 16/22] Temporary fix for AoS --- parcels/collection/collectionaos.py | 7 +++++-- parcels/particleset/particlesetaos.py | 10 ++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/parcels/collection/collectionaos.py b/parcels/collection/collectionaos.py index e33dfd2cd..553b62868 100644 --- a/parcels/collection/collectionaos.py +++ b/parcels/collection/collectionaos.py @@ -59,8 +59,9 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p self._pu_indicators = convert_to_flat_array(partitions) for kwvar in kwargs: - assert lon.size == kwargs[kwvar].size, ( - f'{kwvar} and positions (lon, lat, depth) do nott have the same lengths.') + if kwvar not in ['partition_function']: + assert lon.size == kwargs[kwvar].size, ( + f'{kwvar} and positions (lon, lat, depth) do nott have the same lengths.') offset = np.max(pid) if (pid is not None) and len(pid) > 0 else -1 if MPI: @@ -124,6 +125,8 @@ def __init__(self, pclass, lon, lat, depth, time, lonlatdepth_dtype, pid_orig, p for kwvar in kwargs: if isinstance(kwvar, Field): continue + if kwvar in ['partition_function']: + continue if not hasattr(self._data[i], kwvar): raise RuntimeError(f'Particle class does not have Variable {kwvar}') setattr(self._data[i], kwvar, kwargs[kwvar][i]) diff --git a/parcels/particleset/particlesetaos.py b/parcels/particleset/particlesetaos.py index d983fcf25..e7e673c17 100644 --- a/parcels/particleset/particlesetaos.py +++ b/parcels/particleset/particlesetaos.py @@ -242,9 +242,10 @@ def ObjectClass_sizeof_forward(self): 'lon lat depth precision should be set to either np.float32 or np.float64' for kwvar in kwargs: - kwargs[kwvar] = convert_to_flat_array(kwargs[kwvar]) - assert lon.size == kwargs[kwvar].size, ( - f"{kwvar} and positions (lon, lat, depth) don't have the same lengths.") + if kwvar not in ['partition_function']: + kwargs[kwvar] = convert_to_flat_array(kwargs[kwvar]) + assert lon.size == kwargs[kwvar].size, ( + f"{kwvar} and positions (lon, lat, depth) don't have the same lengths.") self.repeatdt = repeatdt.total_seconds() if isinstance(repeatdt, delta) else repeatdt if self.repeatdt: @@ -270,7 +271,8 @@ def ObjectClass_sizeof_forward(self): self.repeatlat = copy(self._collection.lat) self.repeatdepth = copy(self._collection.depth) for kwvar in kwargs: - self.repeatkwargs[kwvar] = copy(getattr(self._collection, kwvar)) + if kwvar not in ['partition_function']: + self.repeatkwargs[kwvar] = copy(getattr(self._collection, kwvar)) if self.repeatdt: if MPI and self._collection.pu_indicators is not None: From ec042939077a342f1cf9b97f181ca13580e5f159 Mon Sep 17 00:00:00 2001 From: Erik van Sebille Date: Mon, 28 Aug 2023 08:29:13 +0200 Subject: [PATCH 17/22] Updating MPI test to also run on linux if mpi4py does not exist --- tests/test_mpirun.py | 64 ++++++++++++++++++++------------------------ 1 file changed, 29 insertions(+), 35 deletions(-) diff --git a/tests/test_mpirun.py b/tests/test_mpirun.py index 1ed330935..060e3d1af 100644 --- a/tests/test_mpirun.py +++ b/tests/test_mpirun.py @@ -6,43 +6,37 @@ import pytest import xarray as xr -try: - from mpi4py import MPI -except: - MPI = None - - @pytest.mark.skipif(sys.platform.startswith("darwin"), reason="skipping macOS test as problem with file in pytest") +@pytest.mark.skipif(sys.platform.startswith("win"), reason="skipping windows as mpi4py not available for windows") @pytest.mark.parametrize('pset_mode', ['soa']) @pytest.mark.parametrize('repeatdt, maxage', [(20*86400, 600*86400), (10*86400, 10*86400)]) @pytest.mark.parametrize('nump', [4, 8]) def test_mpi_run(pset_mode, tmpdir, repeatdt, maxage, nump): - if MPI: - stommel_file = path.join(path.dirname(__file__), '..', 'docs', - 'examples', 'example_stommel.py') - outputMPI = tmpdir.join('StommelMPI') - outputMPI_partition_function = tmpdir.join('StommelMPI_partition_function') - outputNoMPI = tmpdir.join('StommelNoMPI.zarr') - - system('mpirun -np 2 python %s -p %d -o %s -r %d -a %d -psm %s -wf False -cpf True' % (stommel_file, nump, outputMPI_partition_function, repeatdt, maxage, pset_mode)) - system('mpirun -np 2 python %s -p %d -o %s -r %d -a %d -psm %s -wf False' % (stommel_file, nump, outputMPI, repeatdt, maxage, pset_mode)) - system('python %s -p %d -o %s -r %d -a %d -psm %s -wf False' % (stommel_file, nump, outputNoMPI, repeatdt, maxage, pset_mode)) - - ds2 = xr.open_zarr(outputNoMPI) - - for mpi_run in [outputMPI, outputMPI_partition_function]: - files = glob(path.join(mpi_run, "proc*")) - ds1 = xr.concat([xr.open_zarr(f) for f in files], dim='trajectory', - compat='no_conflicts', coords='minimal').sortby(['trajectory']) - - for v in ds2.variables.keys(): - if v == 'time': - continue # skip because np.allclose does not work well on np.datetime64 - assert np.allclose(ds1.variables[v][:], ds2.variables[v][:], equal_nan=True) - - for a in ds2.attrs: - if a != 'parcels_version': - assert ds1.attrs[a] == ds2.attrs[a] - - ds1.close() - ds2.close() + stommel_file = path.join(path.dirname(__file__), '..', 'docs', + 'examples', 'example_stommel.py') + outputMPI = tmpdir.join('StommelMPI') + outputMPI_partition_function = tmpdir.join('StommelMPI_partition_function') + outputNoMPI = tmpdir.join('StommelNoMPI.zarr') + + system('mpirun -np 2 python %s -p %d -o %s -r %d -a %d -psm %s -wf False -cpf True' % (stommel_file, nump, outputMPI_partition_function, repeatdt, maxage, pset_mode)) + system('mpirun -np 2 python %s -p %d -o %s -r %d -a %d -psm %s -wf False' % (stommel_file, nump, outputMPI, repeatdt, maxage, pset_mode)) + system('python %s -p %d -o %s -r %d -a %d -psm %s -wf False' % (stommel_file, nump, outputNoMPI, repeatdt, maxage, pset_mode)) + + ds2 = xr.open_zarr(outputNoMPI) + + for mpi_run in [outputMPI, outputMPI_partition_function]: + files = glob(path.join(mpi_run, "proc*")) + ds1 = xr.concat([xr.open_zarr(f) for f in files], dim='trajectory', + compat='no_conflicts', coords='minimal').sortby(['trajectory']) + + for v in ds2.variables.keys(): + if v == 'time': + continue # skip because np.allclose does not work well on np.datetime64 + assert np.allclose(ds1.variables[v][:], ds2.variables[v][:], equal_nan=True) + + for a in ds2.attrs: + if a != 'parcels_version': + assert ds1.attrs[a] == ds2.attrs[a] + + ds1.close() + ds2.close() From cb91a2e5094038b2c8c186dd2d1769ca751e208c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 28 Aug 2023 06:29:41 +0000 Subject: [PATCH 18/22] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_mpirun.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_mpirun.py b/tests/test_mpirun.py index 060e3d1af..c41070a2d 100644 --- a/tests/test_mpirun.py +++ b/tests/test_mpirun.py @@ -6,6 +6,7 @@ import pytest import xarray as xr + @pytest.mark.skipif(sys.platform.startswith("darwin"), reason="skipping macOS test as problem with file in pytest") @pytest.mark.skipif(sys.platform.startswith("win"), reason="skipping windows as mpi4py not available for windows") @pytest.mark.parametrize('pset_mode', ['soa']) From 356f1f7cb6b1d1f5b641f82f31834f1e0feda300 Mon Sep 17 00:00:00 2001 From: Erik van Sebille Date: Mon, 28 Aug 2023 08:34:19 +0200 Subject: [PATCH 19/22] Fixing flake8 error --- tests/test_mpirun.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_mpirun.py b/tests/test_mpirun.py index c41070a2d..6bb7c5d76 100644 --- a/tests/test_mpirun.py +++ b/tests/test_mpirun.py @@ -13,8 +13,7 @@ @pytest.mark.parametrize('repeatdt, maxage', [(20*86400, 600*86400), (10*86400, 10*86400)]) @pytest.mark.parametrize('nump', [4, 8]) def test_mpi_run(pset_mode, tmpdir, repeatdt, maxage, nump): - stommel_file = path.join(path.dirname(__file__), '..', 'docs', - 'examples', 'example_stommel.py') + stommel_file = path.join(path.dirname(__file__), '..', 'docs', 'examples', 'example_stommel.py') outputMPI = tmpdir.join('StommelMPI') outputMPI_partition_function = tmpdir.join('StommelMPI_partition_function') outputNoMPI = tmpdir.join('StommelNoMPI.zarr') From 165c0b9cf79c705a965d361985439cbf2b795922 Mon Sep 17 00:00:00 2001 From: Erik van Sebille Date: Sat, 2 Sep 2023 14:33:57 +0200 Subject: [PATCH 20/22] Added short tutorial section on partition functions to tutorial_MPI --- docs/examples/documentation_MPI.ipynb | 45 ++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/docs/examples/documentation_MPI.ipynb b/docs/examples/documentation_MPI.ipynb index 0756f7b70..7c4727ba5 100644 --- a/docs/examples/documentation_MPI.ipynb +++ b/docs/examples/documentation_MPI.ipynb @@ -44,6 +44,49 @@ "Note that in principle this means that all MPI processors need access to the full `FieldSet`, which can be Gigabytes in size for large global datasets. Therefore, efficient parallelisation only works if at the same time we also chunk the `FieldSet` into smaller domains.\n" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Optimising the partitioning of the particles with a user-defined partition_function" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Parcels uses so-called *partition functions* to assign particles to processors. By default, Parcels uses a `scikit-learn` `KMeans` clustering algorithm to partition the particles. However, you can also define your own partition function, which can be useful if you know more about the distribution of particles in your domain than the `KMeans` algorithm does.\n", + "\n", + "To define your own partition function, you need to define a function that takes a list of coordinates (lat, lon) as input, and returns a list of integers, defining the MPI processor that each particle should be assigned to. See the example below:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def simple_partition_function(coords, mpi_size=1):\n", + " \"\"\"A very simple partition function that assigns particles to processors\"\"\"\n", + " return np.array([int(i) for i in np.linspace(0, mpi_size, coords.shape[0], endpoint=False)])" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "To add the partition function to your script, you need to add it to the ParticleSet constructor:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pset = ParticleSet(..., partition_function=simple_partition_function)" + ] + }, { "attachments": {}, "cell_type": "markdown", @@ -121,7 +164,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "For small projects, the above instructions are sufficient. If your project is large, then it is helpful to combine the `proc*` directories into a single zarr dataset and to optimize the chunking for your analysis. What is \"large\"? If you find yourself running out of memory while doing your analyses, saving the results, or sorting the dataset, or if reading the data is taking longer than you can tolerate, your problem is \"large.\" Another rule of thumb is if the size of your output directory is 1/3 or more of the memory of your machine, your problem is large. Chunking and combining the `proc*` data in order to speed up analysis is discussed [in the documentation on runs with large output](https://docs.oceanparcels.org/en/latest/examples/documentation_LargeRunsOutput.html).\n" + "For small projects, the above instructions are sufficient. If your project is large, then it is helpful to combine the `proc*` directories into a single zarr dataset and to optimise the chunking for your analysis. What is \"large\"? If you find yourself running out of memory while doing your analyses, saving the results, or sorting the dataset, or if reading the data is taking longer than you can tolerate, your problem is \"large.\" Another rule of thumb is if the size of your output directory is 1/3 or more of the memory of your machine, your problem is large. Chunking and combining the `proc*` data in order to speed up analysis is discussed [in the documentation on runs with large output](https://docs.oceanparcels.org/en/latest/examples/documentation_LargeRunsOutput.html).\n" ] }, { From 78948a47aef5ed6140969edb3912cd18fdcb1eb4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 2 Sep 2023 12:34:31 +0000 Subject: [PATCH 21/22] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- docs/examples/documentation_MPI.ipynb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/docs/examples/documentation_MPI.ipynb b/docs/examples/documentation_MPI.ipynb index 7c4727ba5..eb962693d 100644 --- a/docs/examples/documentation_MPI.ipynb +++ b/docs/examples/documentation_MPI.ipynb @@ -67,8 +67,10 @@ "outputs": [], "source": [ "def simple_partition_function(coords, mpi_size=1):\n", - " \"\"\"A very simple partition function that assigns particles to processors\"\"\"\n", - " return np.array([int(i) for i in np.linspace(0, mpi_size, coords.shape[0], endpoint=False)])" + " \"\"\"A very simple partition function that assigns particles to processors\"\"\"\n", + " return np.array(\n", + " [int(i) for i in np.linspace(0, mpi_size, coords.shape[0], endpoint=False)]\n", + " )" ] }, { From 22b10ae3164cd556c82194331de7b28accf6eaaf Mon Sep 17 00:00:00 2001 From: Erik van Sebille Date: Sat, 2 Sep 2023 15:18:44 +0200 Subject: [PATCH 22/22] Update conf.py to ignore binder.org for linkchecking As is unresponsive --- docs/conf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/conf.py b/docs/conf.py index 61e0d3975..5dc91b83c 100755 --- a/docs/conf.py +++ b/docs/conf.py @@ -75,6 +75,7 @@ # To monitor r"http://marine.copernicus.eu/", # 2023-06-07 Site non-responsive r"https://www\.nodc\.noaa\.gov/", # 2023-06-23 Site non-responsive + r"https://mybinder\.org/", # 2023-09-02 Site non-responsive ] # The version info for the project you're documenting, acts as replacement for