You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Let me first note this section is being written to preserve one approach to parallelizing pwmig that so far has seemed to be a bad idea.
First, let me (@pavlis ) put down for the record a simplified version of the pwmig algorithm and the fundamental issues that arise in that algorithm. The pwmig algorithm translation from C++ can be abstracted as 3 nest loops defined by the following pseudocode:
In terms of size, the number of events will always be relatively small (<100) as long as we continue the use of source side stacking which at this point is the only rational choice. The number of plane waves is a modest number as well. Our test data uses 121 but there are good reasons to up that to the order of 400. The lowest level loop (3) is controlled by the pseudostation grid size and is orders of magnitude larger. For the test data it is around 1800 but for something like USArray in the lower 48 it is more like 5000-10000.
Following standard guidance that the innermost loop should be the focus of parallelization we aimed to parallelize the following function that is the serial form of step 3) - known to work:
def _migrate_component(cursor,db,parent,TPfield,VPsvm,Us3d,Vp1d,Vs1d,control):
"""
This is an intermediate level function used in the mspass version of
pwmig. It will migrate one plane wave component of data created by
pwstack and return the result as a GCLvectorfield3d object.
The caller may either immediately stack the plane wave components
or save them and run a seperate stacker later. In either case the
expectation is this function appears as the function object in a
map operator driven by a bag of cursors. The cursors are assumed
define a correct and consistent set of data. No checking is made
here to verify the consistency of the inputs.
The code first sets up the raygrid geometry based on input slowness
data passed through the VPsvm object (here a GCLvectorfield but originally
this was a smaller thing read from the old pwstack to pwmig file structure).
"""
zmax=control.get_double("maximum_depth")
tmax=control.get_double("maximum_time_lag")
dt=control.get_double("data_sample_interval")
# old pwmig had a relic calculation of a variable ustack and deltaslow here
# Replacing it here with slowness vectors for each grid point in the
# function called in the fold operation below. Note deltaslow is
# invariant anyway and can be extracted from the metadata of each Seismogram
# also skipping coherence grid stuff as that was found earlier to not be
# worth the extra compute time and it would be better done in his framework
# as an indepndent processing step
VPVSmax=2.0 # Intentionally large as this is just used to avoid infinite loops where used
# This sigature of this function has changed from old pwmig - depricated
# consant u mode
raygrid=Build_GCLraygrid(parent,VPsvm,Vs1d,zmax,VPVSmax*tmax,dt*VPVSmax)
pwdgrid=PWMIGfielddata(raygrid)
for doc in cursor:
#seis = dask.delayed(db.read_data)(doc)
#print("DEBUG: working on wfid=",doc['_id'])
seis = db.read_data(doc,collection="wf_Seismogram")
# This functions is defined above as delayed with a decorator
seis = _set_incident_slowness_metadata(seis, VPsvm)
migseis = migrate_one_seismogram(seis,parent,raygrid,TPfield,Us3d,Vp1d,Vs1d,control)
#migseis = dask.delayed(migrate_one_seismogram)\
# (seis,parent,raygrid,TPfield,Us3d,Vp1d,Vs1d,control)
pwdgrid.accumulate(migseis)
#pwdgrid = dask.compute(*pwdgrid)
return pwdgrid
A fundamental problem with this algorithm, I think, is that the accumulate method of the grid object is not "pure" by the dask definition. As a result, to get dask delayed to work at all I had to push the results of the migrate_one_seismogram function to an list and then run accumulate in serial. The following is that implementation:
def _migrate_component(cursor,db,parent,TPfield,VPsvm,Us3d,Vp1d,Vs1d,control):
"""
This is an intermediate level function used in the mspass version of
pwmig. It will migrate one plane wave component of data created by
pwstack and return the result as a GCLvectorfield3d object.
The caller may either immediately stack the plane wave components
or save them and run a seperate stacker later. In either case the
expectation is this function appears as the function object in a
map operator driven by a bag of cursors. The cursors are assumed
define a correct and consistent set of data. No checking is made
here to verify the consistency of the inputs.
The code first sets up the raygrid geometry based on input slowness
data passed through the VPsvm object (here a GCLvectorfield but originally
this was a smaller thing read from the old pwstack to pwmig file structure).
"""
zmax=control.get_double("maximum_depth")
tmax=control.get_double("maximum_time_lag")
dt=control.get_double("data_sample_interval")
# old pwmig had a relic calculation of a variable ustack and deltaslow here
# Replacing it here with slowness vectors for each grid point in the
# function called in the fold operation below. Note deltaslow is
# invariant anyway and can be extracted from the metadata of each Seismogram
# also skipping coherence grid stuff as that was found earlier to not be
# worth the extra compute time and it would be better done in his framework
# as an indepndent processing step
VPVSmax=2.0 # Intentionally large as this is just used to avoid infinite loops where used
# This sigature of this function has changed from old pwmig - depricated
# consant u mode
raygrid=Build_GCLraygrid(parent,VPsvm,Vs1d,zmax,VPVSmax*tmax,dt*VPVSmax)
pwdgrid=PWMIGfielddata(raygrid)
#pwdgrid=dask.delayed(PWMIGfielddata(raygrid))
seislist = []
for doc in cursor:
seis = dask.delayed(db.read_data)(doc,collection="wf_Seismogram")
# This functions is defined above as delayed with a decorator
seis = _set_incident_slowness_metadata(seis, VPsvm)
migseis = dask.delayed(migrate_one_seismogram_python)\
(seis,parent,raygrid,TPfield,Us3d,Vp1d,Vs1d,control)
#This uses an @delayed decorator in its definition
#pwdgrid = accumulate_python(pwdgrid, migseis)
#pwdgrid.accumulate(migseis)
seislist.append(migseis)
#pwdgrid = dask.compute(*pwdgrid)
seislist = dask.compute(*seislist)
for d in seislist:
pwdgrid.accumulate(d)
return pwdgrid
The problem with that implementation is the performance is truly awful. The serial version took about 4 hours to process the test data set. When I ran the delayed algorithm above I let it run for about 20 hours before inserting timing code to see if it was actually working at all. When I did that I estimated the algorithm above would have taken around 40 hours to complete - a factor of 10 SLOWER than the serial version.
At the time I'm putting this down we don't yet know what is causing this horrible performance, but the culprit is likely serialization of large data objects. Three arguments passed to the migrate_one_seismogram function are huge: raygrid, TPfield, and Us3d. It seems likely the default scheduler setup is not handling that problem locally and serializing these for every call to that function making a local copy for every worker. The evidence of that is that I can watch memory bloat until memory is exhausted when the delayed loop above is entered.
The lesson for design for parallelizing this algorithm is we can't depend upon default setup for the worker configuration and we have to control data locality. This section in the dask manual on configuring a dask cluster seems a key reference.
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Let me first note this section is being written to preserve one approach to parallelizing pwmig that so far has seemed to be a bad idea.
First, let me (@pavlis ) put down for the record a simplified version of the pwmig algorithm and the fundamental issues that arise in that algorithm. The pwmig algorithm translation from C++ can be abstracted as 3 nest loops defined by the following pseudocode:
In terms of size, the number of events will always be relatively small (<100) as long as we continue the use of source side stacking which at this point is the only rational choice. The number of plane waves is a modest number as well. Our test data uses 121 but there are good reasons to up that to the order of 400. The lowest level loop (3) is controlled by the pseudostation grid size and is orders of magnitude larger. For the test data it is around 1800 but for something like USArray in the lower 48 it is more like 5000-10000.
Following standard guidance that the innermost loop should be the focus of parallelization we aimed to parallelize the following function that is the serial form of step 3) - known to work:
A fundamental problem with this algorithm, I think, is that the
accumulate
method of the grid object is not "pure" by the dask definition. As a result, to get dask delayed to work at all I had to push the results of themigrate_one_seismogram
function to an list and then run accumulate in serial. The following is that implementation:The problem with that implementation is the performance is truly awful. The serial version took about 4 hours to process the test data set. When I ran the delayed algorithm above I let it run for about 20 hours before inserting timing code to see if it was actually working at all. When I did that I estimated the algorithm above would have taken around 40 hours to complete - a factor of 10 SLOWER than the serial version.
At the time I'm putting this down we don't yet know what is causing this horrible performance, but the culprit is likely serialization of large data objects. Three arguments passed to the
migrate_one_seismogram
function are huge: raygrid, TPfield, and Us3d. It seems likely the default scheduler setup is not handling that problem locally and serializing these for every call to that function making a local copy for every worker. The evidence of that is that I can watch memory bloat until memory is exhausted when the delayed loop above is entered.The lesson for design for parallelizing this algorithm is we can't depend upon default setup for the worker configuration and we have to control data locality. This section in the dask manual on configuring a dask cluster seems a key reference.
Beta Was this translation helpful? Give feedback.
All reactions