Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

particle load balancing in parallel runs #1007

Closed
JamiePringle opened this issue Mar 25, 2021 · 21 comments
Closed

particle load balancing in parallel runs #1007

JamiePringle opened this issue Mar 25, 2021 · 21 comments

Comments

@JamiePringle
Copy link
Collaborator

OceanParcels continues to be very useful; thanks! I am now running global problems with Mercator currents tracking coastal dispersal. I run on a large computer in parallel; this data is from parcels 2.2.2. The parallelization is less efficient than it could be, because some of the parallel tasks finish much earlier than others, stalling the completion of the entire task until the last one finishes. The difference in runtime can be up to a factor of three. I think there are two reasons for this.

The first is that the number of particles in each MPI rank/process varies greatly, by a factor of 3. Here is the number of particles in each MPI rank (I record the rank as a variable for each particle):

-rank 0.0 has 76865
-rank 1.0 has 139075
-rank 2.0 has 108955
-rank 3.0 has 106110
-rank 4.0 has 116215
-rank 5.0 has 69845
-rank 6.0 has 46753
-rank 7.0 has 88195
-rank 8.0 has 57745
-rank 9.0 has 52665
-rank 10.0 has 38785
-rank 11.0 has 191560
-rank 12.0 has 98975
-rank 13.0 has 49790
-rank 14.0 has 40403
-rank 15.0 has 129885

The location of particles by the rank of the MPI processes that handles them can be seen in the figure below.

The second is that it appears, but I am not sure, that the clustering is done in lat/long space, and not in grid space, so that, because of the curvature of the earth, the fraction of the grid handled by each rank is very different -- i.e. the fraction of the grid covered by the equitoreal group is much larger than that covered by higher latitude groups.

I realize this domain decomposition is tricky, and involves balancing number of particles and the i/o expense of reading in more data for widely scattered particles.

Is there a way for me to manually assign which particles go to which MPI rank to experiment to see if I can find a more efficient scheme, either generally or for my broadly distributed but geographically sparse case? A pointer to the appropriate code would be fine; I would be happier however if I could specify the decomposition without modifying the code base.

Thank you
Jamie

rankMap

@erikvansebille
Copy link
Member

Hi @JamiePringle! Thanks for raising this Issue/Question! Parallelisation is something we're still improving on, and indeed is a bit rudimentary now. Indeed, the clustering is based on lon/lat instead of grid indices, which is not ideal for Curvilinear Grids. That is something we can quite easily improve, although I'm not sure when it will land in a release

In the meantime, if you want to tweak yourself, the code that does the distribution of the particles over the processors is at
https://github.com/OceanParcels/parcels/blob/master/parcels/particlesets/collectionsoa.py#L107-L125. Perhaps you can put some optimisations there for your particular usecase?

Cheers,
Erik

@JamiePringle
Copy link
Collaborator Author

Great -- will look at in a few days.
Jamie

@JamiePringle
Copy link
Collaborator Author

I have a working version of a decompositions that results in equal number of particles in each area, and roughly equal lat/lon extents. It saves (in my, perhaps odd, case) about 15% in run time. But there are still large discrepancies in run time between groups of particles.

In collectionsoa.py, is there a way to know where in the horizontal grid the particles are? I.e. there location in indices. I am looking to also balance the area that the particles occupy in grid space.

thanks
Jamie

@erikvansebille
Copy link
Member

Hi @JamiePringle. Yes, there is code to identify which grid cell indices particles are occupying, see https://github.com/OceanParcels/parcels/blob/master/parcels/particleset/particlesetsoa.py#L237-L265 (contributed by @angus-g).

So you could call pset.populate_indices() to set values for particle.xi and particle.yi. Is that what you were looking for?

@JamiePringle
Copy link
Collaborator Author

@erikvansebille Yes, that should work well. Thank you.

@JamiePringle
Copy link
Collaborator Author

JamiePringle commented Apr 4, 2021

@erikvansebille Is there any reason the KDtree used in populate_indices is from pykdtree and not sklearn neighbors? The later supports a haversine distance calculations for use on spheres. Also, sklearn is already required for the MPI case. The use of a haversine distance metric is not that important for dense rectangular grids, but is more sensible on the margin.

It is taking me a bit longer because ParticleCollectionSOA is called by ParticleSetSOA, so I don't have access to populate_indices() in ParticleCollectionSOA. This is forcing me to actually understand what is going on...

@erikvansebille
Copy link
Member

Thanks for the feedback, @JamiePringle . I leave it to @angus-g to comment why he chose to use pykdtree rather than sklearn for the KDtree implementation

As for the ParticleCollectionSOA, perhaps @CKehl can help if you have any specific questions on the implementation?

@JamiePringle
Copy link
Collaborator Author

@erikvansebille no worries, it was straightforward to move the required code to where I need it. I have code that can partition the particles to have either equal grid-area or numbers in each partition. I will share when it is mature. However, right now I am tinkering with the best way to split things up. Right now I have code that sometimes makes things better, and sometimes makes things worse, which is not that helpful... There are three things which increase run time per rank; how clumped in space the particles are in each MPI rank, the number of particles in a rank, and the area covered by the particles in a rank. Kmeans handles the first well, I have code that optimizes each of the second and third individually. But each also can perform badly depending on the initial particle distribution. So I am not ready to release this to the world yet.

One complication is that the time it takes to read a chunk from netCDF is very variable, so the best partitioning of particles depends on chunk size. For my case, using compressed netCDF as input, tiles that contain land can read very quickly, since they compress well.

Jamie

@CKehl
Copy link
Contributor

CKehl commented Apr 12, 2021

Well, so, short answers as I am on vacations:
a) There are already quite extensive plans how to improve parallelisation for the general case (algorithms, tree balance routines, theoretical CS considerations).
b) optimizing per area requires to know the whole space partitioning in advance - are the fields preloaded ? If so, that's quite a no-no for 'general working'. What happens with this load balancing when having multiple, different grids ? You can only 'area-optimize' for one grid.
c) sklearn vs. pykdtree vs. SciPy ckdtree: the main reason is dependency - it's not advisable to make parcels depend on a behemoth such as sklearn just for constructing a kDTree, because then we need to test all the version each and every time we make a release -> science-project manpower bottleneck. So let's keep it simple, with the dependencies (unless there are performance benefits)! All 3 implementations are roughly on-par - well, the sklearn one is even a bit slower for our case as it optimizes decomposition for item dimensionality (hashing) because of data science applications, and we only have max. 4 dimensions. Long story short: as we already have a dependency on numpy and scipy, I'd rather opt (with general algorithms for parallel decomposition) for SciPy-spatial's kDTree.
d) We can always just condition-include your fix with similar if ...: rank_decompose = True else: rank_decompose = None as it is with MPI or with @angus-g 's _populated_indices code, so the function is there if you wanna use it. For a general 'fix', we need to make sure that the implementation works (i) with dynamic and static particle sets, (ii) with all kind of fields and field- and grid setups, (iii) over long timespans (i.e. with no memory accumulation) etc. One would need parallel tests for that, and those parallel tests are on our todo-list first.
e) the timing depends on the individual chunksize - indeed true, because I/O is the main bottleneck of performance (sth. you also don't circumvent with MPI). The 'idividual chunksize' is something one cannot control - it depends on the data. The most generally-applicable optimization is to assume the whole field domain is equi-sized, because otherwise you'd first need to pre-load the data to optimize the distribution. Honestly: doing that costs more time than is overall gained from running in parallel. Usually.

Now, that's all quite critical, but: I'd be happy to see what you did to get a more detailed impression on what it is that you implemented. Do you perhaps have a pull request-link for us ?

@JamiePringle
Copy link
Collaborator Author

JamiePringle commented Apr 12, 2021

Thanks! By question,

a) I am glad to hear this!

b) I am not sure what you mean by preloading the grids; all I am doing in area balancing is to calculate the area spanned by the set of particles assigned to each rank (either in lat/lon or in grid indices). If you wait a bit, I will show you the code I have developed, along with results for various cases.

It is important to note that ALL I am doing is changing the method by which particles are assigned to MPI ranks at the beginning of the run. I am not re-balancing them afterwards; in my short in time but global runs, this is not necessary.

c) I have maintained large software packages; I appreciate the need to reduce dependencies. But your code already requires sklearn when using MPI -- if you look at collectionsoa.py, you import KMeans from sklearn.cluster when you use MPI. So this would reduce the number of packages you require. You are right that the sklearn implementation with haversine is slower.

d) I appreciate this comment! I am working on one specific problem and you have to deal with the whole code. I expect but have not tested that (i) will not be a problem. (ii) will always be problematic, as the optimal way to break up particles is strongly dependent on issues like, for example, the penalties for a single processes accessing multiple parts of the flow field in different locations, etc. (iii) is not a problem; the code is only run once in MPI rank 0 when it assigns which rank is to processes each particle in the use of the ParticleCollectionSOA class (e.g. "self._pu_indicators = groups"); it mirrors the existing kmeans code. All tests are run with MPI and in parallel.

e) I agree. I have a fair bit of experience with this kind of issue, and in IO-bound problems, the parallelization scheme is always a function of the data layout, and you don't know that ahead of time.

The optimal way to partition particles to ranks already depends on how they are distributed spatially in the model domain with the current code. What is delaying me now is developing test cases to explore this. But in a week or two, I should have results of code to share, along with testing results.

Is a pull request the best way to do so? Or should I just upload the modified files as attachments to this issue? I make one modification to particlesetsoa.py that you might want to think about before accepting -- I modify ParticleCollectionSOA so that I pass in the fieldset when it is called from particlesetsoa.py; I am not sure that is the optimal way to pass the grid information into ParticleCollectionSOA(), and what other implications of this change might be. Otherwise all my changes are self contained in a single location in the code.

Another question: right now I just use my code to partition the particles to be run by different ranks instead of the existing kmeans code in "class ParticleCollectionSOA(ParticleCollection)"; Is there a preferred way to make the choice of partitioning a user option? Or should I just let you deal with that if you think my code is of more general interest?

Jamie

@CKehl
Copy link
Contributor

CKehl commented Apr 12, 2021

Hi Jamie,

again, on vacations, so I am looking foward to your code as soon as I am back - in a 6 weeks.
You make all fair points, as far as I superficially read, most of them require more reading I am not doing right now.

1 comment though on (b): oh, you're right, I forgot about that. In my mind, and my personal derivative checkout, that dependency is already gone. You're right: right now, it's still in there.

Best,

Christian

@JamiePringle
Copy link
Collaborator Author

@CKehl Please don't worry; I have the code working to solve my problem; by the time you return I will have posted a full write up with the code, and you can take from it what you will.

@JamiePringle
Copy link
Collaborator Author

@CKehl and @erikvansebille What is the most appropriate way to get the land mask (or equivalent)? I can't seem to find it in the field object. Thanks

@reint-fischer
Copy link
Contributor

@CKehl and @erikvansebille What is the most appropriate way to get the land mask (or equivalent)? I can't seem to find it in the field object. Thanks

@JamiePringle, the land mask is not available in Parcels objects as Parcels does not interpret how the input velocity fields define 'land' and 'ocean'. The only thing Parcels does is set NaN values in the velocity field to zero.

Some model data have a mask available in the netcdf or an accompanying coordinates file, I am not sure about Mercator. If it is not available, you can easily create one by using numpy.ma.masked_invalid(Udata).mask or numpy.ma.masked_values(Udata, 0).mask depending on whether the velocity is defined as NaN or 0 respectively.

If you want to access the land mask in a parcels simulation you can create a Field object for it using Field(), Field.from_netcdf() or Field.from_xarray and add it to your fieldset using FieldSet.add_field().

Hope this helps!

@JamiePringle
Copy link
Collaborator Author

Thank you -- That is helpful. I am going to find the land points in this case in an ad-hoc way. If it turns out it is helpful, I will try to do it properly. Jamie

@JamiePringle
Copy link
Collaborator Author

I am going to close this issue. Thank you to @CKehl and @erikvansebille for there help. I will document the results here for those who might find it useful. I have working code for all I describe, and will be happy to share.

Background: When using MPI to parallelize ocean parcels, the first first step is to decompose the particles into groups which are run by different processes (MPI "ranks"). For optimum efficiency, each rank should take the same time to complete, since otherwise you will waste time waiting for one rank to finish. The time for each rank to finish depends on, among other things, the number of particles and the horizontal area spanned by the particles. The latter influences the IO each rank must do reading model fields into the process. The relative importance of the area vrs. number of particles depends on how the data is chunked and the speed of the IO. All of this was confirmed experimentally.

Short summary: The current Kmeans implementation of particle decomposition is robust and sensible. The things I tried can be a little more efficient for some initial particle distributions, but they can also be much worse for other initial conditions. For now, I would stick to Kmeans for general use.

Summary: For runs which use MPI to break run particle tracking in parallel, an initial decomposition of the drifters using a kmeans clustering algorithm is robust and does not go horribly wrong because it does a good job of balancing the area covered by particles in single rank and the number of particles in a single rank. My efforts to break up the particles so that their is an equal number of particles in each rank, or an equal area spanned by particles in each rank, were never as robust as Kmeans. Kmeans was always either the most efficient, or nearly the most efficient, while the other two methods could be much worse than the others, depending on the initial distribution of particles. Kmeans was never more than 5% slower than the best method in my tests, while the other two methods could be significantly more slow depending on the initial particle distribution.

For my grid, decomposing the particles using the grid coordinates instead of the geographical coordinates rarely gained more than a few percent.

If the ocean parcels code was ever changed to allow particles to switch ranks, my runs could be 30% more efficient if the particles were re-gridded when a rank finished its current allotment of particles.

@erikvansebille
Copy link
Member

Thanks @JamiePringle, for this extensive summary and feedback! Really useful also to have at hand when we further develop MPI capacity in Parcels

One thing though that struck me as I was reading your summary: I can imagine that there are also many use cases where the users themselves know 'best' how to initially partition the particles. What I mean is that if users could self-assign the clusters (because they have background knowledge on the geography and flow), why would they still need Kmeans or other automatic clustering algorithms to do the clustering for them? So I can imagine that we would provide an easy method for users to self-assign clusters too

@JamiePringle
Copy link
Collaborator Author

@erikvansebille I agree -- and it seems it would not be that hard to do. Certainly there are things I have discovered that I can do that would gain me 10% for my specific use case that would be too fragile to put into the actual code. I would appreciate that in the general code (though for now, I have what I have done... but it would be nice not to have to report it every version...). Thanks for all your help.

@CKehl
Copy link
Contributor

CKehl commented May 12, 2021

@JamiePringle - also from me thank you for the nice summary. I still need to read this more carefully, and for reasons of finding back this entry in our knowledge base, I re-open the issue and tag it accordingly. I hope this is also in your spirit.

@JamiePringle
Copy link
Collaborator Author

JamiePringle commented May 12, 2021

@CKehl Of course, and thank you. Always welcome to show you the code I developed, if you wish.

@erikvansebille
Copy link
Member

This has now been addressed in #1414, providing more flexibility for the users to control the (initial) partitioning

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants