diff --git a/src/pyscenic/prune.py b/src/pyscenic/prune.py index 5eb7bdf..d8439d8 100644 --- a/src/pyscenic/prune.py +++ b/src/pyscenic/prune.py @@ -165,7 +165,8 @@ def _distributed_calc(rnkdbs: Sequence[Type[RankingDatabase]], modules: Sequence systems 'custom_multiprocessing' or 'dask_multiprocessing' can be supplied. :param num_workers: If not using a cluster, the number of workers to use for the calculation. None of all available CPUs need to be used. - :param module_chunksize: The size of the chunk in signatures to use when using the dask framework. + :param module_chunksize: The size of the chunk in signatures to use when using the dask framework with the + multiprocessing scheduler. :return: A pandas dataframe or a sequence of regulons (depends on aggregate function supplied). """ def is_valid(client_or_address): @@ -218,38 +219,40 @@ def load(fname): # Create dask graph. def create_graph(client=None): - # In a cluster the motif annotations need to be broadcasted to all nodes. Otherwise - # the motif annotations need to wrapped in a delayed() construct to avoid needless pickling and - # unpicking between processes. - delayed_or_future_annotations = client.scatter(motif_annotations, broadcast=True) if client \ - else delayed(motif_annotations, pure=True) - - # TODO: Also broadcast the databases. Although they are just a stub to the disk they still take up - # TODO: substantial information (1.7Mb TBI) and therefore necessitate broadcasting. The weird thing - # TODO: here is that it should only contain name and fname! But memoization could increase it! - #/user/leuven/304/vsc30402/data/miniconda3/envs/pyscenic/lib/python3.6/site-packages/distributed/worker.py:742: UserWarning: Large object of size 1.71 MB detected in task graph: - # (FeatherRankingDatabase(name="mm9-tss-centered-10k ... 00817ba90c387') - # Consider scattering large objects ahead of time - # with client.scatter to reduce scheduler burden and - # keep data on workers - # - # future = client.submit(func, big_data) # bad - # - # big_future = client.scatter(big_data) # good - # future = client.submit(func, big_future) # good - # % (format_bytes(len(b)), s)` - + # NOTE ON CHUNKING SIGNATURES: # Chunking the gene signatures might not be necessary anymore because the overhead of the dask # scheduler is minimal (cf. blog http://matthewrocklin.com/blog/work/2016/05/05/performant-task-scheduling). # The original behind the decision to implement this was the refuted assumption that fast executing tasks # would greatly be impacted by scheduler overhead. The chunking of signatures seemed to corroborate # this assumption. However, the benefit was through less pickling and unpickling of the motif annotations # dataframe as this was not wrapped in a delayed() construct. + # When using a distributed scheduler chuncking is overruled to avoid having these large chunks to be shipped + # to different workers across cluster nodes. + # NOTE ON BROADCASTING DATASET: + # There are three large pieces of data that need to be orchestrated between scheduler and workers: + # 1. In a cluster the motif annotations need to be broadcasted to all nodes. Otherwise + # the motif annotations need to wrapped in a delayed() construct to avoid needless pickling and + # unpicking between processes. + def wrap(data): + return client.scatter(data, broadcast=True) if client else delayed(data, pure=True) + delayed_or_future_annotations = wrap(motif_annotations) + # 2. The databases: these database objects are typically should proxies to the data on disk. The only have + # the name and location on shared storage as fields. For consistency reason we do broadcast these database + # objects to the workers. If we decide to have all information of a database loaded into memory we can still + # safely use clusters. + delayed_or_future_dbs = list(map(wrap, rnkdbs)) + # 3. The gene signatures: these signatures become large when chunking them, therefore chunking is overruled + # when using dask.distributed. + if client: + module_chunksize = 1 + + # NOTE ON RANKING DATABASES ON DISK: # Remark on sharing ranking databases across a cluster. Because the frontnodes of the VSC for the LCB share # a file server and have a common home folder configured, these database (stored on this shared drive) # can be accessed from all nodes in the cluster and can all use the same path in the configuration file. + # NOTE ON REMOVING I/O CONTENTION: # A potential improvement to reduce I/O contention for this shared drive (accessing the ranking # database) would be to load the database in memory (using the available decorator) for each task. # The penalty of loading the database in memory should be shared across multiple gene signature so @@ -261,7 +264,7 @@ def create_graph(client=None): return aggregate_func( (delayed(transform_func) (db, gs_chunk, delayed_or_future_annotations) - for db in rnkdbs + for db in delayed_or_future_dbs for gs_chunk in chunked_iter(modules, module_chunksize))) # Compute dask graph ...