Skip to content

Commit

Permalink
Optimisations for executing on clusters using dask.distributed.
Browse files Browse the repository at this point in the history
  • Loading branch information
bramvds committed Apr 28, 2018
1 parent ca620bb commit ea5f732
Showing 1 changed file with 26 additions and 23 deletions.
49 changes: 26 additions & 23 deletions src/pyscenic/prune.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ def _distributed_calc(rnkdbs: Sequence[Type[RankingDatabase]], modules: Sequence
systems 'custom_multiprocessing' or 'dask_multiprocessing' can be supplied.
:param num_workers: If not using a cluster, the number of workers to use for the calculation.
None of all available CPUs need to be used.
:param module_chunksize: The size of the chunk in signatures to use when using the dask framework.
:param module_chunksize: The size of the chunk in signatures to use when using the dask framework with the
multiprocessing scheduler.
:return: A pandas dataframe or a sequence of regulons (depends on aggregate function supplied).
"""
def is_valid(client_or_address):
Expand Down Expand Up @@ -218,38 +219,40 @@ def load(fname):

# Create dask graph.
def create_graph(client=None):
# In a cluster the motif annotations need to be broadcasted to all nodes. Otherwise
# the motif annotations need to wrapped in a delayed() construct to avoid needless pickling and
# unpicking between processes.
delayed_or_future_annotations = client.scatter(motif_annotations, broadcast=True) if client \
else delayed(motif_annotations, pure=True)

# TODO: Also broadcast the databases. Although they are just a stub to the disk they still take up
# TODO: substantial information (1.7Mb TBI) and therefore necessitate broadcasting. The weird thing
# TODO: here is that it should only contain name and fname! But memoization could increase it!
#/user/leuven/304/vsc30402/data/miniconda3/envs/pyscenic/lib/python3.6/site-packages/distributed/worker.py:742: UserWarning: Large object of size 1.71 MB detected in task graph:
# (FeatherRankingDatabase(name="mm9-tss-centered-10k ... 00817ba90c387')
# Consider scattering large objects ahead of time
# with client.scatter to reduce scheduler burden and
# keep data on workers
#
# future = client.submit(func, big_data) # bad
#
# big_future = client.scatter(big_data) # good
# future = client.submit(func, big_future) # good
# % (format_bytes(len(b)), s)`

# NOTE ON CHUNKING SIGNATURES:
# Chunking the gene signatures might not be necessary anymore because the overhead of the dask
# scheduler is minimal (cf. blog http://matthewrocklin.com/blog/work/2016/05/05/performant-task-scheduling).
# The original behind the decision to implement this was the refuted assumption that fast executing tasks
# would greatly be impacted by scheduler overhead. The chunking of signatures seemed to corroborate
# this assumption. However, the benefit was through less pickling and unpickling of the motif annotations
# dataframe as this was not wrapped in a delayed() construct.
# When using a distributed scheduler chuncking is overruled to avoid having these large chunks to be shipped
# to different workers across cluster nodes.

# NOTE ON BROADCASTING DATASET:
# There are three large pieces of data that need to be orchestrated between scheduler and workers:
# 1. In a cluster the motif annotations need to be broadcasted to all nodes. Otherwise
# the motif annotations need to wrapped in a delayed() construct to avoid needless pickling and
# unpicking between processes.
def wrap(data):
return client.scatter(data, broadcast=True) if client else delayed(data, pure=True)
delayed_or_future_annotations = wrap(motif_annotations)
# 2. The databases: these database objects are typically should proxies to the data on disk. The only have
# the name and location on shared storage as fields. For consistency reason we do broadcast these database
# objects to the workers. If we decide to have all information of a database loaded into memory we can still
# safely use clusters.
delayed_or_future_dbs = list(map(wrap, rnkdbs))
# 3. The gene signatures: these signatures become large when chunking them, therefore chunking is overruled
# when using dask.distributed.
if client:
module_chunksize = 1

# NOTE ON RANKING DATABASES ON DISK:
# Remark on sharing ranking databases across a cluster. Because the frontnodes of the VSC for the LCB share
# a file server and have a common home folder configured, these database (stored on this shared drive)
# can be accessed from all nodes in the cluster and can all use the same path in the configuration file.

# NOTE ON REMOVING I/O CONTENTION:
# A potential improvement to reduce I/O contention for this shared drive (accessing the ranking
# database) would be to load the database in memory (using the available decorator) for each task.
# The penalty of loading the database in memory should be shared across multiple gene signature so
Expand All @@ -261,7 +264,7 @@ def create_graph(client=None):
return aggregate_func(
(delayed(transform_func)
(db, gs_chunk, delayed_or_future_annotations)
for db in rnkdbs
for db in delayed_or_future_dbs
for gs_chunk in chunked_iter(modules, module_chunksize)))

# Compute dask graph ...
Expand Down

0 comments on commit ea5f732

Please sign in to comment.