Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update collectionsoa.py to allow user defined MPI partitioning #1414

Merged
merged 25 commits into from
Sep 2, 2023

Conversation

JamiePringle
Copy link
Collaborator

@JamiePringle JamiePringle commented Aug 21, 2023

This code allows users to change the function which determines which particles are run on which MPI jobs using the function setPartitionFunction(). Attached to this pull request is a modified version of example_stommel.py which uses this functionality, and run_example_stommel.py which shows how to implement different partitioning schemes. Using this partitioning scheme on my global runs saves me about 20% time, which for my 10 days runs is worth noticing.

To use this function, a new partitioning function must be created with two arguments: (coords,mpi_size=1) The arguements and output are

    Input:

    coords: numpy array with rows of [lon, lat] so that
    coords.shape[0] is the number of particles and coords.shape[1] is 2.

    mpi_size=1: the number of MPI processes.

    Output:

    mpiProcs: an integer array with values from 0 to mpi_size-1
    specifying which MPI job will run which particles. len(mpiProcs)
    must equal coords.shape[0]

The existing partitioning function in this format is now

def partitionParticles4MPI_default(coords,mpi_size=1):
    '''...
    '''

    if KMeans:
        kmeans = KMeans(n_clusters=mpi_size, random_state=0).fit(coords)
        mpiProcs = kmeans.labels_    
    else:  # assigning random labels if no KMeans (see https://github.com/OceanParcels/parcels/issues/1261)
        logger.warning_once('sklearn needs to be available if MPI is installed. '
                            'See http://oceanparcels.org/#parallel_install for more information')
        mpiProcs = np.randint(0, mpi_size, size=len(lon))
        
    #print('Using default KMeans partitioning of particles to MPI processes',flush=True)

    return mpiProcs

One example I have found useful is a function that requires that the number of particles in each MPI job is roughly equal. This prevents the default KMeans algorithm from making small clusters around, for example, the Hawaiian islands. These unequal sizes of MPI jobs leads to unequal allocation of compute resources, and long runs as some MPI processes take much longer to finish. To make the equal allocation of particles, I use a constrained KMeans algorithm. This can be very slow, so I include an option (ncull) to do the initial clustering on a sub-set of the particles. It is important to note that this new partitioning function does NOT need to be included in the parcels distribution -- it is entirely created by the user of parcels.

def partitionParticles4MPI_KMeans_constrained(coords,mpi_size=1):
    #This code does a constrained k-means which ensures that each
    #cluster has a the same number of particles to within a multiplicative factor "slop".
    #Because the constrained k-means can be slow, the clustering is done on
    #a data set that has been reduced by a factor of nCull.
    #the kMeansConstrained function can be obtained from https://joshlk.github.io/k-means-constrained/

    #for large runs, you will have to increase nCull so this runs in a reasonable time. 
    nCull=2
    tic=time.time()
    print('Starting constrained k-means for',coords.shape[0],'particles and nCull',nCull,flush=True)
    coordsTrain=coords[::nCull,:]
    slop=0.1
    maxSizeCluster=int((1.0+slop)*(coordsTrain.shape[0]/mpi_size))
    minSizeCluster=int((1.0-slop)*(coordsTrain.shape[0]/mpi_size))
    kmeans = KMeansConstrained(n_clusters=mpi_size, size_min=minSizeCluster, size_max=maxSizeCluster,
                               random_state=0,n_jobs=-4).fit(coordsTrain)
    
    #now predict where all particles go from the kmeans calculated on partial data set
    mpiProcs = kmeans.predict(coords,minSizeCluster*nCull,maxSizeCluster*nCull)
    print('   done with constrained k-means in',time.time()-tic,'seconds',flush=True)
    return mpiProcs

This code, and the following example of its use, come from the attached example_stommel.py. To use this function, we must import the setPartitionFunction() with from parcels.collection.collectionsoa import setPartitionFunction and BEFORE making the particle set, we must set the new function to be used with setPartitionFunction(partitionParticles4MPI_KMeans_constrained).

I have attached figures for an example in which the initial particle positions are 4 clumps of particles with greatly different numbers of particles. The default KMeans code works correctly, which means it successfully identifies the spatially separate clumps, and so creates MPI jobs with very different numbers of particles.
Partition_by_default
The constrained KMeans breaks the particles into less compact but more equally sized groups.
Partition_by_KMeans_constrained
Now, there are clear trade-offs between equal size MPI jobs and locality of particles. But in my case, I have found equal size particles to be a big win.

If yall like where this is going, I can write up some documentation for it.

codeForStommelExample.zip

JamiePringle and others added 3 commits August 21, 2023 11:06
This code allows users to change the function which determines which particles are run on which MPI jobs using the function setPartitionFunction().
@JamiePringle
Copy link
Collaborator Author

Oops. I created this branch only a few days ago, but apparently many changes have been made to collectionsoa.py in that time. It shows that there are many changes in the code, but there are really only two that are mine -- in the new code, lines 29-83 and 131-137. I am afraid that my unfamiliarity with gitHub is showing...

parcels/collection/collectionsoa.py Outdated Show resolved Hide resolved
parcels/collection/collectionsoa.py Outdated Show resolved Hide resolved
parcels/collection/collectionsoa.py Outdated Show resolved Hide resolved
parcels/collection/collectionsoa.py Outdated Show resolved Hide resolved
parcels/collection/collectionsoa.py Outdated Show resolved Hide resolved
parcels/collection/collectionsoa.py Show resolved Hide resolved
@JamiePringle
Copy link
Collaborator Author

@erikvansebille I was trying to do this with as little change to the code as possible, since I had not originally thought of putting it into the master branch of the code. However, what you say makes sense, but I need a little time to dive into the code to make sure I understand how particle set creation integrates with functions like .from_line() and .from_list().

I will try to add a unit test, if it seems straight forward.

I will be able to get back to this once I get some reviews off my desk...

@erikvansebille erikvansebille changed the base branch from master to v3.0 August 28, 2023 06:53
@erikvansebille erikvansebille merged commit cb44378 into v3.0 Sep 2, 2023
10 checks passed
@erikvansebille erikvansebille deleted the userPartitionMPI branch September 2, 2023 14:13
@erikvansebille erikvansebille restored the userPartitionMPI branch September 2, 2023 14:25
@erikvansebille erikvansebille deleted the userPartitionMPI branch September 2, 2023 14:28
erikvansebille added a commit that referenced this pull request Sep 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants