Scaling Considerations #117
Unanswered
ljstrnadiii
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Jotting down some ideas to scale out with other approaches than Beam/xbeam.
Goal
Leverage Xarray and various backends (most often zarr for us) to write the dataset to zarr. This is often the main theme and is the objective of https://github.com/google/Xee/blob/main/examples/ee_to_zarr.py. We write out in order to get the ee ingest out of the way making it easy to take advantage of the Xarray ecosystem.
Why
Often we build very large datasets either in the time dimension, the spatial dimensions or both. For example, we may want high temporal frequency datasets for time series work or large spatial extent for jurisdiction, state, country or even global analysis. I really like the idea of using this package to export scene level data or composites. The former gives us complete control, but can have a massive impact on size. The latter let's us take advantage of EE for monthly/quarterly/annual/etc composites, which greatly reduces the computation and data storage by letting us take advantage of the engine part of EE. This raises the concern for caching [1].
How
The most common approach to scale Xarray datasets is Dask (for us anyways). In the best case, we can connect to a dask cluster and simply call
.to_zarr()
. There are two immediate considerations that surface with this package: exporting scene level data will create many nan-chunks [2] and.to_zarr()
on a cluster will require our dataset is pickleable [3].Alternative Approaches
In theory, we simply need to build a "target" dataset with
.to_zarr(...,compute=False)
and then we can manually write to underlying chunks with something like.to_zarr(..., region="auto")
or.to_zarr(..., region={'x': slice(0,1024), 'y':slice(0,1024})
. This allows us to build the dataset in parallel over these regions. I am experimenting using Flyte to accomplish writing out each region in parallel withmap_task
described here, but this could be accomplished in many other ways like Dask, Beam, etc. This brings up two other considerations: implications of io_chunks and using the threadpool [4] and how that might fare with dask chunks [5]. There is also a query-per-second quota on EE, which is another consideration [6]. Predicting the number of requests for any ImageCollection seems challenging and we will need to limit the number of concurrent writes somehow. The last consideration is that within each of these processes that write out a region, we need to open the dataset again [7]. How can we efficiently re-open a sub-region of a dataset assuming we can not pickle the thing? I currently take the hit and open the thing in each task/pod/process and then use.isel()
to subset the relevant region that corresponds to the task and write to zarr.Open Questions
Running list of considerations:
.to_zarr()
out of the box, we would need to pickle the dataset opened on the client side and send to the workers. I believe that is not possible simply due to the ee.Initialize() constraint. Maybe we can simply ee.Initialize() in each dask worker? There is likely still a pickle issue for the dataset?io_chunks
seems to be what determines how many tasks will be submitted to the TreadPoolExecutor, which currently has no limit set on number of threads, but will [default to the number of processes on the current machine.] (https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor).Connection pool is full, discarding connection: [oauth2.googleapis.com](http://oauth2.googleapis.com/). Connection pool size: 10
open_dataset(...,engine='ee')
to take quite some time.Beta Was this translation helpful? Give feedback.
All reactions