diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 52915813..ca3100b4 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -26,6 +26,7 @@ jobs: python -m pip install -r requirements.txt python -m pip install -r requirements-test.txt python -m pip install -r requirements-multiprocessing.txt + python -m pip install numba - name: Test with pytest & coverage run: | python -m coverage run -m pytest diff --git a/LICENSE b/LICENSE index f6a18dac..68a3a06c 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,5 @@ Copyright (c) 2016-2021 The Simons Foundation, Inc. +Copyright (c) 2021 Broad Institute of MIT and Harvard All rights reserved. Redistribution and use in source and binary forms, with or without diff --git a/docs/changelog.rst b/docs/changelog.rst index d28baca8..37f6fddf 100644 --- a/docs/changelog.rst +++ b/docs/changelog.rst @@ -1,14 +1,31 @@ Change Log ========== +Inferelator v0.5.7 `September 29, 2021` +--------------------------------------- + +New Functionality: + +- Added support for numba acceleration of AMuSR with ``.set_run_parameters(use_numba=True)`` (PR #46) + +Code Refactoring: + +- Updated example scripts +- Removed deprecated KVS multiprocessing and associated code + +Bug Fixes: + +- Gene labels are included as the first column of the produced confidences TSV file by default +- Matplotlib backend selection checks for non-interactive mode + Inferelator v0.5.6 `August 16, 2021` ------------------------------------ +------------------------------------ New Functionality: - Added code to randomly generate noise in prior with ``.set_shuffle_parameters(add_prior_noise=None)`` - Added in-workflow benchmarks for CellOracle and pySCENIC -- + Code Refactoring: @@ -16,7 +33,7 @@ Code Refactoring: - Improved testing for multitask workflows - Improved error messaging around prior and gold standard - Switch from Travis.ci to GitHub Actions for continuous integration -- + Inferelator v0.5.5 `April 29, 2021` ----------------------------------- diff --git a/docs/conf.py b/docs/conf.py index f92e3a16..7428a064 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -23,7 +23,7 @@ author = 'Chris Jackson' # The full version, including alpha/beta/rc tags -release = 'v0.5.6' +release = 'v0.5.7' # -- General configuration --------------------------------------------------- diff --git a/examples/Bsubtilis_Network_Inference.ipynb b/examples/Bsubtilis_Network_Inference.ipynb index 0a57f7e4..5a322546 100644 --- a/examples/Bsubtilis_Network_Inference.ipynb +++ b/examples/Bsubtilis_Network_Inference.ipynb @@ -144,7 +144,8 @@ "worker.append_to_path('output_dir', 'final')\n", "worker.set_crossvalidation_parameters(split_gold_standard_for_crossvalidation=False, cv_split_ratio=None)\n", "worker.set_run_parameters(num_bootstraps=50, random_seed=100)\n", - "final_network = worker.run()" + "\n", + "final_network_results = worker.run()" ] }, { @@ -154,8 +155,35 @@ "outputs": [], "source": [ "# Visualize network results\n", + "# The workflow returns an InferelatorResults object\n", + "\n", + "# There is a dataframe with an edge table for the final network\n", + "final_network_results.network.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# There is a list of dataframes with model coeffcients\n", + "# Each list element is a dataframe with the results from one bootstrap\n", + "# The dataframes are genes x TFs\n", + "\n", + "final_network_results.betas[0].iloc[0:5, 0:5]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# The confidence scores for each network edge are also accessible\n", + "# This dataframe is genes x TFs\n", "\n", - "final_network.head()" + "final_network_results.combined_confidences.iloc[0:5, 0:5]" ] } ], @@ -175,7 +203,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.7.6" + "version": "3.7.10" } }, "nbformat": 4, diff --git a/examples/Yeast_Network_Inference.ipynb b/examples/Yeast_Network_Inference.ipynb index 26d81edf..37f06c29 100644 --- a/examples/Yeast_Network_Inference.ipynb +++ b/examples/Yeast_Network_Inference.ipynb @@ -209,7 +209,8 @@ "worker.append_to_path('output_dir', 'final')\n", "worker.set_crossvalidation_parameters(split_gold_standard_for_crossvalidation=False, cv_split_ratio=None)\n", "worker.set_run_parameters(num_bootstraps=50, random_seed=100)\n", - "final_network = worker.run()" + "\n", + "final_network_results = worker.run()" ] }, { @@ -219,8 +220,35 @@ "outputs": [], "source": [ "# Visualize network results\n", + "# The workflow returns an InferelatorResults object\n", + "\n", + "# There is a dataframe with an edge table for the final network\n", + "final_network_results.network.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# There is a list of dataframes with model coeffcients\n", + "# Each list element is a dataframe with the results from one bootstrap\n", + "# The dataframes are genes x TFs\n", + "\n", + "final_network_results.betas[0].iloc[0:5, 0:5]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# The confidence scores for each network edge are also accessible\n", + "# This dataframe is genes x TFs\n", "\n", - "final_network.head()" + "final_network_results.combined_confidences.iloc[0:5, 0:5]" ] } ], @@ -240,7 +268,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.7.6" + "version": "3.7.10" } }, "nbformat": 4, diff --git a/inferelator/amusr_workflow.py b/inferelator/amusr_workflow.py index 3500985b..1aceb879 100644 --- a/inferelator/amusr_workflow.py +++ b/inferelator/amusr_workflow.py @@ -386,16 +386,14 @@ def emit_results(self, betas, rescaled_betas, gold_standard, priors_data): This is called when `.startup()` is run. It is not necessary to call separately. """ - if self.is_master(): - self.create_output_dir() - rp = self._result_processor_driver(betas, rescaled_betas, filter_method=self.gold_standard_filter_method, - metric=self.metric) - rp.tasks_names = self._task_names - self.results = rp.summarize_network(self.output_dir, gold_standard, self._task_priors) - self.task_results = rp.tasks_networks - return self.results - else: - return None + + self.create_output_dir() + rp = self._result_processor_driver(betas, rescaled_betas, filter_method=self.gold_standard_filter_method, + metric=self.metric) + rp.tasks_names = self._task_names + self.results = rp.summarize_network(self.output_dir, gold_standard, self._task_priors) + self.task_results = rp.tasks_networks + return self.results def create_task_data_object(workflow_class="single-cell"): diff --git a/inferelator/benchmarking/scenic.py b/inferelator/benchmarking/scenic.py index 6a73a5d6..338cf0f4 100644 --- a/inferelator/benchmarking/scenic.py +++ b/inferelator/benchmarking/scenic.py @@ -187,10 +187,12 @@ def reprocess_scenic_output_to_inferelator_results(scenic_df, prior_data): scenic_df.columns = scenic_df.columns.droplevel(0) mat = [pd.DataFrame(data).set_index(0).rename({1: tf}, axis=1) - for tf, data in scenic_df['TargetGenes'].iteritems()] - - mat = pd.concat(mat, axis=0).reindex(prior_data.columns, axis=1).fillna(0) + for tf, data in scenic_df['TargetGenes'].iteritems()] + mat = pd.concat(mat, axis=0).fillna(0) + mat = mat.groupby(mat.index).agg('max') + mat = mat.reindex(prior_data.columns, axis=1).reindex(prior_data.index, axis=0).fillna(0) + return [mat], [mat.copy()] @staticmethod diff --git a/inferelator/crossvalidation_workflow.py b/inferelator/crossvalidation_workflow.py index 38877529..e4a311b9 100644 --- a/inferelator/crossvalidation_workflow.py +++ b/inferelator/crossvalidation_workflow.py @@ -266,33 +266,32 @@ def _create_writer(self): Create a CSVWriter and stash it in self.writer """ - if MPControl.is_master: - # Create a CSV header from grid search param names - self._csv_header = copy.copy(self.grid_params) if self.grid_params is not None else [] + # Create a CSV header from grid search param names + self._csv_header = copy.copy(self.grid_params) if self.grid_params is not None else [] - # Add Test & Value columns for dropouts/etc - self._csv_header.extend(["Test", "Value", "Num_Obs"]) + # Add Test & Value columns for dropouts/etc + self._csv_header.extend(["Test", "Value", "Num_Obs"]) - # Also add the metric name - self._csv_header.extend(MetricHandler.get_metric(self.workflow.metric).all_names()) + # Also add the metric name + self._csv_header.extend(MetricHandler.get_metric(self.workflow.metric).all_names()) - # Create a CSV writer - self._create_output_path() - self._open_csv_handle() + # Create a CSV writer + self._create_output_path() + self._open_csv_handle() - self._csv_writer = self._csv_writer_object(self._csv_file_handle, - delimiter="\t", lineterminator="\n", quoting=csv.QUOTE_NONE) + self._csv_writer = self._csv_writer_object(self._csv_file_handle, + delimiter="\t", lineterminator="\n", quoting=csv.QUOTE_NONE) - # Write the header line - self._csv_writer.writerow(self._csv_header) + # Write the header line + self._csv_writer.writerow(self._csv_header) def _destroy_writer(self): """ Delete the CSVWriter and close the file handle """ - if MPControl.is_master: - self._csv_file_handle.close() - self._csv_writer = None + + self._csv_file_handle.close() + self._csv_writer = None def _harmonize_paths(self): """ @@ -417,8 +416,7 @@ def _grid_search(self, test=None, value=None, mask_function=None): results.append(((test, value), result)) - if MPControl.is_master: - self._csv_writer.writerow(csv_line) + self._csv_writer.writerow(csv_line) del cv_workflow diff --git a/inferelator/default.py b/inferelator/default.py index 376397ef..f37c71b1 100644 --- a/inferelator/default.py +++ b/inferelator/default.py @@ -9,14 +9,8 @@ # This is a dict, keyed by the class setattr variable name, of tuples (env name, coercion function, default value) SBATCH_VARS = dict(output_dir=('RUNDIR', str, None), - input_dir=('DATADIR', str, None), - rank=('SLURM_PROCID', int, 0), - cores=('SLURM_NTASKS_PER_NODE', int, 1), - tasks=('SLURM_NTASKS', int, 1), - node=('SLURM_NODEID', int, 0), - num_nodes=('SLURM_JOB_NUM_NODES', int, 1)) - -SBATCH_VARS_FOR_KVS = ["rank", "cores", "tasks", "node", "num_nodes"] + input_dir=('DATADIR', str, None)) + SBATCH_VARS_FOR_WORKFLOW = ["output_dir", "input_dir"] """Default Data File Settings""" diff --git a/inferelator/distributed/__init__.py b/inferelator/distributed/__init__.py index f5b9a070..9a4ea7a2 100644 --- a/inferelator/distributed/__init__.py +++ b/inferelator/distributed/__init__.py @@ -7,9 +7,6 @@ class AbstractController: # The object which handles the multiprocessing client = None - # Boolean to identify master processes where needed - is_master = False - # The chunk sizes for calls to map chunk = 25 @@ -56,15 +53,6 @@ def set_processes(cls, process_count): """ raise NotImplementedError - @classmethod - @abstractmethod - def sync_processes(cls, *args, **kwargs): - """ - This synchronizes multiple processes. Multiprocessing methods which have a defined hierarchy and no risk of - race conditions may simply return True - """ - raise NotImplementedError - @classmethod @abstractmethod def shutdown(cls): diff --git a/inferelator/distributed/dask_cluster_controller.py b/inferelator/distributed/dask_cluster_controller.py index 80a55312..1fb6fb74 100644 --- a/inferelator/distributed/dask_cluster_controller.py +++ b/inferelator/distributed/dask_cluster_controller.py @@ -112,7 +112,6 @@ class DaskHPCClusterController(AbstractController): _controller_name = "dask-cluster" _controller_dask = True - is_master = True client = None # Cluster Controller @@ -278,14 +277,7 @@ def set_processes(cls, process_count): utils.Debug.vprint("Using `set_job_size_params` is highly preferred", level=0) utils.Debug.vprint("Configured {n} jobs with {w} workers per job".format(n=cls._job_n, w=cls._job_n_workers), level=0) - - @classmethod - def sync_processes(cls, *args, **kwargs): - """ - This is a thing for KVS. Just return True. - """ - return True - + @classmethod def add_worker_env_line(cls, line): """ diff --git a/inferelator/distributed/dask_functions.py b/inferelator/distributed/dask_functions.py index 8e719781..5c19c101 100644 --- a/inferelator/distributed/dask_functions.py +++ b/inferelator/distributed/dask_functions.py @@ -17,7 +17,7 @@ def amusr_regress_dask(X, Y, priors, prior_weight, n_tasks, genes, tfs, G, remove_autoregulation=True, lambda_Bs=None, lambda_Ss=None, Cs=None, Ss=None, regression_function=None, - tol=None, rel_tol=None): + tol=None, rel_tol=None, use_numba=False): """ Execute multitask (AMUSR) @@ -55,7 +55,8 @@ def regression_maker(j, x_df, y_list, prior, tf): prior = format_prior(prior, gene, tasks, prior_weight, tfs=tf) return j, regression_function(x, y, tf, tasks, gene, prior, - lambda_Bs=lambda_Bs, lambda_Ss=lambda_Ss, Cs=Cs, Ss=Ss, tol=tol, rel_tol=rel_tol) + lambda_Bs=lambda_Bs, lambda_Ss=lambda_Ss, Cs=Cs, Ss=Ss, + tol=tol, rel_tol=rel_tol, use_numba=use_numba) def response_maker(y_df, i): y = [] diff --git a/inferelator/distributed/dask_k8_controller.py b/inferelator/distributed/dask_k8_controller.py index ef22b896..ddc18846 100644 --- a/inferelator/distributed/dask_k8_controller.py +++ b/inferelator/distributed/dask_k8_controller.py @@ -27,7 +27,6 @@ class DaskK8Controller(AbstractController): _controller_name = "dask-k8" _controller_dask = True - is_master = True client = None ## Dask controller variables ## @@ -76,13 +75,6 @@ def set_processes(cls, process_count): """ cls.processes = process_count - @classmethod - def sync_processes(self, *args, **kwargs): - """ - This is a thing for KVS. Just return True. - """ - return True - @classmethod def check_cluster_state(cls, *args, **kwargs): """ diff --git a/inferelator/distributed/dask_local_controller.py b/inferelator/distributed/dask_local_controller.py index 24461d58..451209cf 100644 --- a/inferelator/distributed/dask_local_controller.py +++ b/inferelator/distributed/dask_local_controller.py @@ -27,7 +27,6 @@ class DaskController(AbstractController): _controller_name = "dask-local" _controller_dask = True - is_master = True client = None ## Dask controller variables ## @@ -76,13 +75,6 @@ def set_processes(cls, process_count): """ cls.processes = process_count - @classmethod - def sync_processes(self, *args, **kwargs): - """ - This is a thing for KVS. Just return True. - """ - return True - @classmethod def check_cluster_state(cls): """ diff --git a/inferelator/distributed/inferelator_mp.py b/inferelator/distributed/inferelator_mp.py index 1123b0c2..28f73351 100644 --- a/inferelator/distributed/inferelator_mp.py +++ b/inferelator/distributed/inferelator_mp.py @@ -15,7 +15,6 @@ class MPControl(AbstractController): client = None # Relevant external state booleans - is_master = False is_initialized = False @classmethod @@ -46,7 +45,6 @@ def set_multiprocess_engine(cls, engine, processes=None): dask-cluster dask-k8 dask-local - kvs multiprocessing local @@ -69,10 +67,7 @@ def set_multiprocess_engine(cls, engine, processes=None): from inferelator.distributed.dask_k8_controller import DaskK8Controller cls.client = DaskK8Controller elif engine == "kvs": - warnings.warn("The KVS engine is deprecated. It has been replaced by Dask-based multiprocessing", - DeprecationWarning) - from inferelator.distributed.kvs_controller import KVSController - cls.client = KVSController + raise DeprecationWarning("The KVS engine is deprecated. Use Dask-based multiprocessing") elif engine == "multiprocessing": from inferelator.distributed.multiprocessing_controller import MultiprocessingController cls.client = MultiprocessingController @@ -107,12 +102,8 @@ def connect(cls, *args, **kwargs): connect_return = cls.client.connect(*args, **kwargs) # Set the process state - cls.is_master = cls.client.is_master cls.is_initialized = True - # Also tell Debug if we're the master process - utils.Debug.is_master = cls.is_master - return connect_return @classmethod @@ -133,18 +124,6 @@ def set_processes(cls, process_count): raise RuntimeError("Cannot set processes after the engine has started") return cls.client.set_processes(process_count) - @classmethod - def sync_processes(cls, *args, **kwargs): - """ - Make sure processes are all at the same point by calling the `.sync_processes()` implementation in the - multiprocessing engine - - This is necessary for KVS; other engines will just return True - """ - if not cls.is_initialized: - raise RuntimeError("Connect before calling sync_processes()") - return cls.client.sync_processes(*args, **kwargs) - @classmethod def shutdown(cls): """ diff --git a/inferelator/distributed/kvs_controller.py b/inferelator/distributed/kvs_controller.py deleted file mode 100644 index 42e9e959..00000000 --- a/inferelator/distributed/kvs_controller.py +++ /dev/null @@ -1,311 +0,0 @@ -""" -KVSController is a wrapper for KVSClient that adds some useful functionality related to interprocess -communication. -It also keeps track of a bunch of SLURM related stuff that was previously workflow's problem. -""" - -from kvsstcp import KVSClient - -from inferelator.distributed import AbstractController -from inferelator.utils import Validator as check -from inferelator import utils -from inferelator import default - -import os -import warnings -import collections -import tempfile - -# Use cPickle in python 2 -try: - import cPickle as pickle -except ImportError: - import pickle - -# Shadow built-in zip with itertools.izip if this is python2 (This puts out a memory dumpster fire) -try: - from itertools import izip as zip -except ImportError: - pass - -# KVS environment variables -SBATCH_VARS_FOR_KVS = ["SLURM_PROCID", "SLURM_NTASKS_PER_NODE", "SLURM_NTASKS", "SLURM_NODEID", "SLURM_JOB_NUM_NODES"] - -# Which process is the boss -DEFAULT_MASTER = 0 - -# KVS Keys to use -COUNT = "kvs_count" -TMP_FILE_SYNC = "tmp_file_read" -POST_SYNC = "post_sync" -PILEUP_DATA = "data_pileup" -FINAL_DATA = "final_data" - - -class KVSController(AbstractController): - - # An active KVSClient object - client = None - chunk = 25 - _controller_name = "kvs" - - # Set from SLURM environment variables - rank = None # int - tasks = None # int - node = None # int - cores = None # int - num_nodes = None # int - is_master = False # bool - - @classmethod - def connect(cls, *args, **kwargs): - """ - Create a new KVS object with some object variables set to reflect the slurm environment - """ - - # Get local environment variables - cls._get_env(master_rank=kwargs.pop("master_rank", DEFAULT_MASTER)) - - # Connect to the host server by calling to KVSClient.__init__ - cls.client = KVSClient(*args, **kwargs) - - @classmethod - def shutdown(cls): - return cls.client.close() - - @classmethod - def _get_env(cls, master_rank=DEFAULT_MASTER): - """ - Get the SLURM environment variables that are set by sbatch at runtime. - """ - for k, v in utils.slurm_envs(default.SBATCH_VARS_FOR_KVS).items(): - setattr(cls, k, v) - if cls.rank == master_rank: - cls.is_master = True - else: - cls.is_master = False - - @classmethod - def own_check(cls, chunk=1, kvs_key='count'): - if cls.is_master: - return ownCheck(cls.client, 0, chunk=chunk, kvs_key=kvs_key) - else: - return ownCheck(cls.client, 1, chunk=chunk, kvs_key=kvs_key) - - @classmethod - def master_remove_key(cls, kvs_key='count'): - if cls.is_master: - cls.client.get(kvs_key) - - @classmethod - def set_processes(cls, process_count): - """ - Set the number of dask workers to use - :param process_count: int - :return: - """ - utils.Debug.vprint("The KVS engine does not support changing process numbers at runtime", level=0) - - @classmethod - def sync_processes(cls, pref="", value=True): - """ - Block all processes until they reach this point, then release them - It may be wise to use unique prefixes if this is gonna get called rapidly so there's no collision - Or not. I'm a comment, not a cop. - :param pref: str - Prefix attached to the KVS keys - :param value: Anything you can pickle - A value that will be checked for consistency between processes (if you set a different value in a - process, a warning will be issued. This is mostly to check state if needed - :return None: - """ - - wkey = pref + '_wait' - ckey = pref + '_continue' - - # Every process puts a wait key up when it gets here - cls.put_key(wkey, value) - - # The master pulls down the wait keys until it has all of them - # Then it puts up a go key for each process - - if cls.is_master: - for _ in range(cls.tasks): - c_value = cls.get_key(wkey) - if c_value != value: - warnings.warn("Sync warning: master {val_m} is not equal to client {val_c}".format(val_m=value, - val_c=c_value)) - for _ in range(cls.tasks): - cls.put_key(ckey, True) - - # Every process waits here until go keys are available - cls.get_key(ckey) - - @classmethod - def get_key(cls, key): - """ - Wrapper for KVSClient get - """ - return cls.client.get(key) - - @classmethod - def put_key(cls, key, value): - """ - Wrapper for KVSClient put - """ - return cls.client.put(key, value) - - @classmethod - def view_key(cls, key): - """ - Wrapper for KVSClient view - """ - return cls.client.view(key) - - @classmethod - def map(cls, func, *args, **kwargs): - """ - Map a function across iterable(s) and return a list of results - - :param func: function - Mappable function - :param args: iterable - Iterator(s) - :param tmp_file_path: path - If this is not None, instead of putting data onto the KVS, data will be pickled to temp files and the - path to the temp file will be put onto the KVS - :param tell_children: bool - If this is True, all processes will end up with the final data after assembly. If false, only the master - will have the final data; others will return None - :return results: list - """ - - tmp_file_path = kwargs.pop("tmp_file_path", None) - tell_children = kwargs.pop("tell_children", True) - - assert check.argument_callable(func) - assert check.argument_list_type(args, collections.Iterable) - - # Set up the multiprocessing - owncheck = cls.own_check(chunk=cls.chunk, kvs_key=COUNT) - results = dict() - for pos, arg in enumerate(zip(*args)): - if next(owncheck): - results[pos] = func(*arg) - - # Process results and synchronize exit from the get call - results = cls.process_results(results, tmp_file_path=tmp_file_path, tell_children=tell_children) - cls.sync_processes(pref=POST_SYNC) - cls.master_remove_key(kvs_key=COUNT) - cls.master_remove_key(kvs_key=FINAL_DATA) - return results - - @classmethod - def process_results(cls, results, tmp_file_path=None, tell_children=True): - """ - Pile up results from a get call - :param results: dict - A dict of results (keyed by position in a final list) - :param tmp_file_path: path - If this is not None, instead of putting data onto the KVS, data will be pickled to temp files and the - path to the temp file will be put onto the KVS - :param tell_children: bool - If this is True, all processes will end up with the final data after assembly. If false, only the master - will have the final data; others will return None - :return: list - A list of function results - """ - - if tmp_file_path is None: - # Put the data in KVS - cls.put_key(PILEUP_DATA, results) - else: - # Put the data in a pickled file - temp_fd, temp_name = tempfile.mkstemp(prefix="kvs", dir=tmp_file_path) - with os.fdopen(temp_fd, "wb") as temp: - pickle.dump(results, temp, -1) - cls.put_key(PILEUP_DATA, temp_name) - - if cls.is_master: - # If this is the master thread, get all the data and pile it up - pileup_results = dict() - for _ in range(cls.tasks): - if tmp_file_path is None: - pileup_results.update(cls.get_key(PILEUP_DATA)) - else: - temp_name = cls.get_key(PILEUP_DATA) - with open(temp_name, mode="rb") as temp: - pileup_results.update(pickle.load(temp)) - os.remove(temp_name) - - # Put everything into a list based on the dict key - pileup_list = [None] * len(pileup_results) - for idx, val in pileup_results.items(): - pileup_list[idx] = val - - if tell_children and tmp_file_path is None: - # Put the piled-up data into KVS - cls.put_key(FINAL_DATA, pileup_list) - elif tell_children: - # Pute the piled-up data into a pickled file - temp_fd, temp_name = tempfile.mkstemp(prefix="kvs", dir=tmp_file_path) - with os.fdopen(temp_fd, "wb") as temp: - pickle.dump(pileup_list, temp, -1) - cls.put_key(FINAL_DATA, temp_name) - else: - # Put a None onto KVS - only the master needs this data - cls.put_key(FINAL_DATA, None) - - else: - # If this is not the master thread, get the finalized data when the master is finished - if tell_children and tmp_file_path is None: - pileup_list = cls.view_key(FINAL_DATA) - elif tell_children: - with open(cls.view_key(FINAL_DATA), "rb") as temp: - pileup_list = pickle.load(temp) - else: - pileup_list = cls.view_key(FINAL_DATA) - - # Return the piled up data or wait until everyone is done so that the temp file can be deleted - if tmp_file_path is None: - return pileup_list - else: - cls.sync_processes(pref=TMP_FILE_SYNC) - if cls.is_master: - os.remove(cls.view_key(FINAL_DATA)) - return pileup_list - - -def ownCheck(kvs, rank, chunk=1, kvs_key='count'): - """ - Generator - :param kvs: KVSClient - KVS object for server access - :param chunk: int - The size of the chunk given to each subprocess - :param kvs_key: str - The KVS key to increment (default is 'count') - :yield: bool - True if this process has dibs on whatever. False if some other process has claimed it first. - """ - if rank == 0: - kvs.put(kvs_key, 0) - - # Start at the baseline - checks, lower, upper = 0, -1, -1 - - while True: - - # Checks increments every loop - # If it's greater than the upper bound, get a new lower bound from the KVS count - # Set the new upper bound by adding chunk to lower - # And then put the new upper bound back into KVS key - - if checks >= upper: - lower = kvs.get(kvs_key) - upper = lower + chunk - kvs.put(kvs_key, upper) - - # Yield TRUE if this row belongs to this process and FALSE if it doesn't - yield lower <= checks < upper - checks += 1 diff --git a/inferelator/distributed/local_controller.py b/inferelator/distributed/local_controller.py index 321c6689..2032a555 100644 --- a/inferelator/distributed/local_controller.py +++ b/inferelator/distributed/local_controller.py @@ -2,7 +2,7 @@ LocalController just runs everything in a single process """ -import collections +import collections.abc from inferelator.distributed import AbstractController from inferelator import utils @@ -14,17 +14,12 @@ class LocalController(AbstractController): _controller_name = "local" client = None - is_master = True chunk = None @classmethod def connect(cls, *args, **kwargs): return True - @classmethod - def sync_processes(cls, *args, **kwargs): - return True - @classmethod def map(cls, func, *arg, **kwargs): """ @@ -36,7 +31,7 @@ def map(cls, func, *arg, **kwargs): Iterator(s) """ assert check.argument_callable(func) - assert check.argument_list_type(arg, collections.Iterable) + assert check.argument_list_type(arg, collections.abc.Iterable) return list(map(func, *arg)) @classmethod diff --git a/inferelator/distributed/multiprocessing_controller.py b/inferelator/distributed/multiprocessing_controller.py index 21bef0c6..9302a822 100644 --- a/inferelator/distributed/multiprocessing_controller.py +++ b/inferelator/distributed/multiprocessing_controller.py @@ -4,7 +4,7 @@ """ import pathos -import collections +import collections.abc from inferelator.distributed import AbstractController from inferelator.utils import Validator as check @@ -13,7 +13,6 @@ class MultiprocessingController(AbstractController): _controller_name = "multiprocessing" client = None - is_master = True # Control variables chunk = 25 @@ -26,10 +25,6 @@ def connect(cls, *args, **kwargs): cls.client = pathos.multiprocessing.ProcessPool(nodes=cls.processes, **kwargs) return True - @classmethod - def sync_processes(cls, *args, **kwargs): - return True - @classmethod def set_processes(cls, process_count): """ @@ -52,7 +47,7 @@ def map(cls, func, *args, **kwargs): Iterator(s) """ assert check.argument_callable(func) - assert check.argument_list_type(args, collections.Iterable) + assert check.argument_list_type(args, collections.abc.Iterable) return cls.client.map(func, *args, chunksize=cls.chunk) @classmethod diff --git a/inferelator/postprocessing/f1_score.py b/inferelator/postprocessing/f1_score.py index 930cc0f2..3a2bb883 100644 --- a/inferelator/postprocessing/f1_score.py +++ b/inferelator/postprocessing/f1_score.py @@ -4,6 +4,12 @@ from inferelator.postprocessing import (TARGET_COLUMN, REGULATOR_COLUMN, CONFIDENCE_COLUMN, F1_COLUMN, PRECISION_COLUMN, RECALL_COLUMN) +import matplotlib + +# If matplotlib is being an idiot and trying to set a tkinter backend, switch to agg +if matplotlib.get_backend() in (i for i in matplotlib.rcsetup.interactive_bk): + matplotlib.use('agg') + import matplotlib.pyplot as plt diff --git a/inferelator/postprocessing/inferelator_results.py b/inferelator/postprocessing/inferelator_results.py index 0382a182..584f5972 100644 --- a/inferelator/postprocessing/inferelator_results.py +++ b/inferelator/postprocessing/inferelator_results.py @@ -1,5 +1,6 @@ import pandas as pd import os +import matplotlib.pyplot as plt from inferelator.utils import Validator as check _SERIALIZE_ATTRS = ["network", "betas", "betas_stack", "betas_sign", "combined_confidences", "tasks"] @@ -114,12 +115,14 @@ def write_result_files(self, output_dir): # Write TSV files self.write_to_tsv(self.network, output_dir, self.network_file_name, index=False) - self.write_to_tsv(self.combined_confidences, output_dir, self.confidence_file_name) - self.write_to_tsv(self.betas_stack, output_dir, self.threshold_file_name, index=False) + self.write_to_tsv(self.combined_confidences, output_dir, self.confidence_file_name, index=True) + self.write_to_tsv(self.betas_stack, output_dir, self.threshold_file_name, index=True) self.write_to_tsv(self.curve, output_dir, self.curve_data_file_name, index=False) # Write Metric Curve PDF - self.metric.output_curve_pdf(output_dir, self.curve_file_name) if self.curve_file_name is not None else None + if self.curve_file_name is not None: + fig, ax = self.metric.output_curve_pdf(output_dir, self.curve_file_name) + plt.close(fig) def clear_output_file_names(self): """ diff --git a/inferelator/postprocessing/matthews_correlation.py b/inferelator/postprocessing/matthews_correlation.py index 1dfd9772..296e5d5c 100644 --- a/inferelator/postprocessing/matthews_correlation.py +++ b/inferelator/postprocessing/matthews_correlation.py @@ -1,3 +1,4 @@ +from math import isfinite import numpy as np import warnings @@ -5,6 +6,12 @@ from inferelator.postprocessing import (TARGET_COLUMN, REGULATOR_COLUMN, CONFIDENCE_COLUMN, GOLD_STANDARD_COLUMN, MCC_COLUMN, TP, FP, TN, FN) +import matplotlib + +# If matplotlib is being an idiot and trying to set a tkinter backend, switch to agg +if matplotlib.get_backend() in (i for i in matplotlib.rcsetup.interactive_bk): + matplotlib.use('agg') + import matplotlib.pyplot as plt @@ -72,11 +79,14 @@ def plot_mcc_conf(mcc, conf, optmcc, optconf, ax, num_edges=None): num_edges = np.sum(conf >= optconf) if num_edges is None else num_edges + y_min = np.nanmin(mcc) + y_min = 0 if not (0 > y_min) else y_min + # Generate a plot ax.plot(conf, mcc) ax.set_xlabel('Confidence') ax.set_xlim(1, 0) - ax.set_ylim(0, 1) + ax.set_ylim(y_min, 1) ax.set_ylabel('MCC') ax.vlines(float(optconf), 0, 1, transform=ax.get_xaxis_transform(), colors='r', linestyles='dashed') @@ -90,9 +100,7 @@ def plot_mcc_conf(mcc, conf, optmcc, optconf, ax, num_edges=None): @staticmethod def calculate_opt_mcc(data): - with warnings.catch_warnings(): - warnings.simplefilter("ignore", category=RuntimeWarning) - return np.nanmax(data[MCC_COLUMN]) + return data[MCC_COLUMN].iloc[np.argmax(np.abs(data[MCC_COLUMN]))] @staticmethod def calculate_opt_conf_mcc(data): @@ -113,4 +121,9 @@ def calculate_mcc(data): @staticmethod def confusion_to_mcc(tp, tn, fp, fn): - return (tp * tn - fp * fn) / (np.sqrt(tp + fp) * np.sqrt(tp + fn) * np.sqrt(tn + fp) * np.sqrt(tn + fn)) + denominator = np.sqrt(tp + fp) * np.sqrt(tp + fn) * np.sqrt(tn + fp) * np.sqrt(tn + fn) + + # If any denominator value is 0, MCC is 0/0 and by convention will be set to 0.0 + denominator[denominator == 0] = 1.0 + + return (tp * tn - fp * fn) / denominator diff --git a/inferelator/postprocessing/model_metrics.py b/inferelator/postprocessing/model_metrics.py index ec49f50e..0d5ffb56 100644 --- a/inferelator/postprocessing/model_metrics.py +++ b/inferelator/postprocessing/model_metrics.py @@ -8,6 +8,12 @@ from inferelator.utils import is_string import os +import matplotlib + +# If matplotlib is being an idiot and trying to set a tkinter backend, switch to agg +if matplotlib.get_backend() in (i for i in matplotlib.rcsetup.interactive_bk): + matplotlib.use('agg') + import matplotlib.pyplot as plt diff --git a/inferelator/postprocessing/precision_recall.py b/inferelator/postprocessing/precision_recall.py index b6c3c506..67c4609d 100644 --- a/inferelator/postprocessing/precision_recall.py +++ b/inferelator/postprocessing/precision_recall.py @@ -8,6 +8,12 @@ from inferelator.postprocessing import (TARGET_COLUMN, REGULATOR_COLUMN, PRECISION_COLUMN, RECALL_COLUMN, CONFIDENCE_COLUMN, GOLD_STANDARD_COLUMN) +import matplotlib + +# If matplotlib is being an idiot and trying to set a tkinter backend, switch to agg +if matplotlib.get_backend() in (i for i in matplotlib.rcsetup.interactive_bk): + matplotlib.use('agg') + import matplotlib.pyplot as plt diff --git a/inferelator/regression/amusr_regression.py b/inferelator/regression/amusr_regression.py index 0ce64d0f..6104204d 100644 --- a/inferelator/regression/amusr_regression.py +++ b/inferelator/regression/amusr_regression.py @@ -1,6 +1,7 @@ import numpy as np import pandas as pd -from copy import deepcopy +import os + from scipy.special import comb from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LinearRegression @@ -11,12 +12,6 @@ from inferelator import default from inferelator.regression import base_regression -# Shadow built-in zip with itertools.izip if this is python2 (This puts out a memory dumpster fire) -try: - from itertools import izip as zip -except ImportError: - pass - DEFAULT_prior_weight = 1.0 DEFAULT_Cs = np.logspace(np.log10(0.01), np.log10(10), 20)[::-1] @@ -28,7 +23,8 @@ def run_regression_EBIC(X, Y, TFs, tasks, gene, prior, Cs=None, Ss=None, lambda_Bs=None, - lambda_Ss=None, scale_data=False, return_lambdas=False, tol=TOL, rel_tol=REL_TOL): + lambda_Ss=None, scale_data=False, return_lambdas=False, tol=TOL, rel_tol=REL_TOL, + use_numba=False): """ Run multitask regression. Search the regularization coefficient space and select the model with the lowest eBIC. @@ -45,9 +41,17 @@ def run_regression_EBIC(X, Y, TFs, tasks, gene, prior, Cs=None, Ss=None, lambda_ The gene being modeled :param prior: np.ndarray [K x T] The priors for this gene in a TF x Task array + :param scale_data: + :param return_lambdas: + :param tol: + :param rel_tol: + :param use_numba: bool :return: dict """ + if use_numba: + AMuSR_math.set_numba() + assert len(X) == len(Y) assert len(X) == len(tasks) assert prior.ndim == 2 if prior is not None else True @@ -142,11 +146,13 @@ class AMuSR_regression(base_regression.BaseRegression): tol = None rel_tol = None + + use_numba = False regression_function = staticmethod(run_regression_EBIC) def __init__(self, X, Y, tfs=None, genes=None, priors=None, prior_weight=1, remove_autoregulation=True, - lambda_Bs=None, lambda_Ss=None, Cs=None, Ss=None, tol=TOL, rel_tol=REL_TOL): + lambda_Bs=None, lambda_Ss=None, Cs=None, Ss=None, tol=TOL, rel_tol=REL_TOL, use_numba=False): """ Set up a regression object for multitask regression :param X: list(InferelatorData) @@ -158,6 +164,7 @@ def __init__(self, X, Y, tfs=None, genes=None, priors=None, prior_weight=1, remo :param lambda_Ss: list(float) [None] :param Cs: list(float) [None] :param Ss: list(float) [None] + :param use_numba: bool [False] """ # Check input types are correct @@ -207,6 +214,8 @@ def __init__(self, X, Y, tfs=None, genes=None, priors=None, prior_weight=1, remo self.tol = tol self.rel_tol = rel_tol + self.use_numba = use_numba + def regress(self, regression_function=None): """ Execute multitask (AMUSR) @@ -221,7 +230,7 @@ def regress(self, regression_function=None): return amusr_regress_dask(self.X, self.Y, self.priors, self.prior_weight, self.n_tasks, self.genes, self.tfs, self.G, remove_autoregulation=self.remove_autoregulation, regression_function=regression_function, - tol=self.tol, rel_tol=self.rel_tol) + tol=self.tol, rel_tol=self.rel_tol, use_numba=self.use_numba) def regression_maker(j): level = 0 if j % 100 == 0 else 2 @@ -245,7 +254,7 @@ def regression_maker(j): prior = format_prior(self.priors, gene, tasks, self.prior_weight, tfs=tfs) return regression_function(x, y, tfs, tasks, gene, prior, Cs=self.Cs, Ss=self.Ss, lambda_Bs=self.lambda_Bs, lambda_Ss=self.lambda_Ss, - tol=self.tol, rel_tol=self.rel_tol) + tol=self.tol, rel_tol=self.rel_tol, use_numba=self.use_numba) return MPControl.map(regression_maker, range(self.G)) @@ -336,6 +345,7 @@ def amusr_fit(cov_C, cov_D, lambda_B=0., lambda_S=0., sparse_matrix=None, block_ # Initialize weights combined_weights = sparse_matrix + block_matrix + iter_tols = np.zeros(max_iter) for i in range(max_iter): @@ -343,8 +353,11 @@ def amusr_fit(cov_C, cov_D, lambda_B=0., lambda_S=0., sparse_matrix=None, block_ _combined_weights_old = combined_weights # Update sparse and block-sparse coefficients - sparse_matrix = updateS(cov_C, cov_D, block_matrix, sparse_matrix, lambda_S, prior, n_tasks, n_features) - block_matrix = updateB(cov_C, cov_D, block_matrix, sparse_matrix, lambda_B, prior, n_tasks, n_features) + sparse_matrix = AMuSR_math.updateS(cov_C, cov_D, block_matrix, sparse_matrix, + lambda_S, prior, n_tasks, n_features) + + block_matrix = AMuSR_math.updateB(cov_C, cov_D, block_matrix, sparse_matrix, + lambda_B, prior, n_tasks, n_features) # Weights matrix (W) is the sum of a sparse (S) and a block-sparse (B) matrix combined_weights = sparse_matrix + block_matrix @@ -368,111 +381,6 @@ def amusr_fit(cov_C, cov_D, lambda_B=0., lambda_S=0., sparse_matrix=None, block_ return combined_weights, sparse_matrix, block_matrix -def updateS(C, D, B, S, lamS, prior, n_tasks, n_features): - """ - returns updated coefficients for S (predictors x tasks) - lasso regularized -- using cyclical coordinate descent and - soft-thresholding - """ - # update each task independently (shared penalty only) - for k in range(n_tasks): - - c = C[k] - d = D[k] - - b = B[:, k] - s = S[:, k] - p = prior[:, k] * lamS - - # cycle through predictors - - for j in range(n_features): - # set sparse coefficient for predictor j to zero - s[j] = 0. - - # calculate next coefficient based on fit only - if d[j,j] == 0: - alpha = 0. - else: - alpha = (c[j]- np.sum((b + s) * d[j])) / d[j,j] - - # lasso regularization - if abs(alpha) <= p[j]: - s[j] = 0. - else: - s[j] = alpha - (np.sign(alpha) * p[j]) - - # update current task - S[:, k] = s - - return S - -def updateB(C, D, B, S, lamB, prior, n_tasks, n_features): - """ - returns updated coefficients for B (predictors x tasks) - block regularized (l_1/l_inf) -- using cyclical coordinate descent and - soft-thresholding on the l_1 norm across tasks - reference: Liu et al, ICML 2009. Blockwise coordinate descent procedures - for the multi-task lasso, with applications to neural semantic basis discovery. - """ - - S = np.asarray(S, order="F") - - # cycles through predictors - for j in range(n_features): - - # initialize next coefficients - alphas = np.zeros(n_tasks) - - # update tasks for each predictor together - d = D[:, :, j] - - for k in range(n_tasks): - - d_kjj = d[k, j] - - if d_kjj == 0: - - alphas[k] = 0 - - else: - - # get previous block-sparse - # copies because B is C-ordered - b = B[:, k] - - # set block-sparse coefficient for feature j to zero - b[j] = 0. - - # calculate next coefficient based on fit only - alphas[k] = (C[k, j] - np.sum((b + S[:, k]) * d[k, :])) / d_kjj - - - # set all tasks to zero if l1-norm less than lamB - if np.linalg.norm(alphas, 1) <= lamB: - B[j,:] = np.zeros(n_tasks) - - # regularized update for predictors with larger l1-norm - else: - # find number of coefficients that would make l1-norm greater than penalty - indices = np.abs(alphas).argsort()[::-1] - sorted_alphas = alphas[indices] - m_star = np.argmax((np.abs(sorted_alphas).cumsum()-lamB)/(np.arange(n_tasks)+1)) - # initialize new weights - new_weights = np.zeros(n_tasks) - # keep small coefficients and regularize large ones (in above group) - for k in range(n_tasks): - idx = indices[k] - if k > m_star: - new_weights[idx] = sorted_alphas[k] - else: - sign = np.sign(sorted_alphas[k]) - update_term = np.sum(np.abs(sorted_alphas)[:m_star+1])-lamB - new_weights[idx] = (sign/(m_star+1))*update_term - # update current predictor - B[j,:] = new_weights - - return B def _covariance_by_task(X, Y): """ @@ -773,9 +681,9 @@ class AMUSRRegressionWorkflowMixin(base_regression._MultitaskRegressionWorkflowM tol = TOL relative_tol = REL_TOL - - def set_regression_parameters(self, prior_weight=None, lambda_Bs=None, lambda_Ss=None, heuristic_Cs=None, - tol=None, relative_tol=None): + + def set_regression_parameters(self, prior_weight=None, lambda_Bs=None, lambda_Ss=None, heuristic_Cs=None, + tol=None, relative_tol=None, use_numba=None): """ Set regression parameters for AmUSR. @@ -818,10 +726,11 @@ def run_bootstrap(self, bootstrap_idx): x.append(self._task_design[k].get_bootstrap(self._task_bootstraps[k][bootstrap_idx])) y.append(self._task_response[k].get_bootstrap(self._task_bootstraps[k][bootstrap_idx])) - MPControl.sync_processes(pref="amusr_pre") regress = AMuSR_regression(x, y, tfs=self._regulators, genes=self._targets, priors=self._task_priors, prior_weight=self.prior_weight, lambda_Bs=self.lambda_Bs, lambda_Ss=self.lambda_Ss, - Cs=self.heuristic_Cs, tol=self.tol, rel_tol=self.relative_tol) + Cs=self.heuristic_Cs, tol=self.tol, rel_tol=self.relative_tol, + use_numba=self.use_numba) + return regress.run() @@ -852,3 +761,139 @@ def filter_genes_on_tasks(list_of_indexes, task_expression_filter): raise ValueError("{v} is not an allowed task_expression_filter value".format(v=task_expression_filter)) return filtered_genes + +class AMuSR_math: + + _numba = False + + @staticmethod + def updateS(C, D, B, S, lamS, prior, n_tasks, n_features): + """ + returns updated coefficients for S (predictors x tasks) + lasso regularized -- using cyclical coordinate descent and + soft-thresholding + """ + # update each task independently (shared penalty only) + for k in range(n_tasks): + + c = C[k] + d = D[k] + + b = B[:, k] + s = S[:, k] + p = prior[:, k] * lamS + + # cycle through predictors + + for j in range(n_features): + # set sparse coefficient for predictor j to zero + s[j] = 0. + + # calculate next coefficient based on fit only + if d[j,j] == 0: + alpha = 0. + else: + alpha = (c[j]- np.sum((b + s) * d[j])) / d[j,j] + + # lasso regularization + if abs(alpha) <= p[j]: + s[j] = 0. + else: + s[j] = alpha - (np.sign(alpha) * p[j]) + + # update current task + S[:, k] = s + + return S + + @staticmethod + def updateB(C, D, B, S, lamB, prior, n_tasks, n_features): + """ + returns updated coefficients for B (predictors x tasks) + block regularized (l_1/l_inf) -- using cyclical coordinate descent and + soft-thresholding on the l_1 norm across tasks + reference: Liu et al, ICML 2009. Blockwise coordinate descent procedures + for the multi-task lasso, with applications to neural semantic basis discovery. + """ + + + # cycles through predictors + for j in range(n_features): + + # initialize next coefficients + alphas = np.zeros(n_tasks) + + # update tasks for each predictor together + d = D[:, :, j] + + for k in range(n_tasks): + + d_kjj = d[k, j] + + if d_kjj == 0: + + alphas[k] = 0 + + else: + + # get previous block-sparse + # copies because B is C-ordered + b = B[:, k] + + # set block-sparse coefficient for feature j to zero + b[j] = 0. + + # calculate next coefficient based on fit only + alphas[k] = (C[k, j] - np.sum((b + S[:, k]) * d[k, :])) / d_kjj + + + # set all tasks to zero if l1-norm less than lamB + if np.linalg.norm(alphas, 1) <= lamB: + B[j,:] = np.zeros(n_tasks) + + # regularized update for predictors with larger l1-norm + else: + # find number of coefficients that would make l1-norm greater than penalty + indices = np.abs(alphas).argsort()[::-1] + sorted_alphas = alphas[indices] + m_star = np.argmax((np.abs(sorted_alphas).cumsum()-lamB)/(np.arange(n_tasks)+1)) + # initialize new weights + new_weights = np.zeros(n_tasks) + # keep small coefficients and regularize large ones (in above group) + for k in range(n_tasks): + idx = indices[k] + if k > m_star: + new_weights[idx] = sorted_alphas[k] + else: + sign = np.sign(sorted_alphas[k]) + update_term = np.sum(np.abs(sorted_alphas)[:m_star+1])-lamB + new_weights[idx] = (sign/(m_star+1))*update_term + # update current predictor + B[j,:] = new_weights + + return B + + @classmethod + def set_numba(cls): + + # If this has already been called, skip + if cls._numba: + return + + else: + + # If we can't import numba, skip (and set a flag so we don't try again) + try: + import numba + + except ImportError: + utils.Debug.vprint("Unable to import numba; using python-native functions instead", level=0) + cls._numba = True + return + + utils.Debug.vprint("Using numba functions for AMuSR regression", level=0) + + # Replace the existing functions with JIT functions + cls.updateB = numba.jit(cls.updateB, nopython=True) + cls.updateS = numba.jit(cls.updateS, nopython=True) + cls._numba = True diff --git a/inferelator/regression/base_regression.py b/inferelator/regression/base_regression.py index 52558fb5..27def657 100644 --- a/inferelator/regression/base_regression.py +++ b/inferelator/regression/base_regression.py @@ -55,15 +55,7 @@ def run(self): Returns None, None if it's a subordinate thread """ - run_data = self.regress() - - if MPControl.is_master: - pileup_data = self.pileup_data(run_data) - else: - pileup_data = None, None - - MPControl.sync_processes("post_pileup") - return pileup_data + return self.pileup_data(self.regress()) def regress(self): """ @@ -134,17 +126,13 @@ def run_regression(self): betas = [] rescaled_betas = [] - MPControl.sync_processes("pre_regression") - for idx, bootstrap in enumerate(self.get_bootstraps()): Debug.vprint('Bootstrap {} of {}'.format((idx + 1), self.num_bootstraps), level=0) np.random.seed(self.random_seed + idx) current_betas, current_rescaled_betas = self.run_bootstrap(bootstrap) - if self.is_master(): - betas.append(current_betas) - rescaled_betas.append(current_rescaled_betas) - MPControl.sync_processes("post_bootstrap") + betas.append(current_betas) + rescaled_betas.append(current_rescaled_betas) return betas, rescaled_betas @@ -167,10 +155,9 @@ def run_regression(self): Debug.vprint('Bootstrap {} of {}'.format((idx + 1), self.num_bootstraps), level=0) current_betas, current_rescaled_betas = self.run_bootstrap(idx) - if self.is_master(): - for k in range(self._n_tasks): - betas[k].append(current_betas[k]) - rescaled_betas[k].append(current_rescaled_betas[k]) + for k in range(self._n_tasks): + betas[k].append(current_betas[k]) + rescaled_betas[k].append(current_rescaled_betas[k]) return betas, rescaled_betas diff --git a/inferelator/regression/bbsr_multitask.py b/inferelator/regression/bbsr_multitask.py index b9389c43..6e67d42d 100644 --- a/inferelator/regression/bbsr_multitask.py +++ b/inferelator/regression/bbsr_multitask.py @@ -28,8 +28,6 @@ def run_bootstrap(self, bootstrap_idx): # Create a mock prior with no information if clr_only is set priors_data = pd.DataFrame(0, index=priors_data.index, columns=priors_data.columns) - MPControl.sync_processes(pref="bbsr_pre") - Debug.vprint('Calculating MI, Background MI, and CLR Matrix', level=0) clr_matrix, _ = self.mi_driver().run(Y, X, return_mi=False) diff --git a/inferelator/regression/mi.py b/inferelator/regression/mi.py index 4ff8e82b..af61e7ba 100644 --- a/inferelator/regression/mi.py +++ b/inferelator/regression/mi.py @@ -17,9 +17,6 @@ # Log type for MI calculations. np.log2 gives results in bits; np.log gives results in nats DEFAULT_LOG_TYPE = np.log -# KVS keys for multiprocessing -SYNC_CLR_KEY = 'post_clr' - class MIDriver: @@ -71,8 +68,6 @@ def context_likelihood_mi(x, y, bins=DEFAULT_NUM_BINS, logtype=DEFAULT_LOG_TYPE, # Calculate CLR clr = calc_mixed_clr(mi, mi_bg) - MPControl.sync_processes(pref=SYNC_CLR_KEY) - mi = pd.DataFrame(mi, index=mi_r, columns=mi_c) clr = pd.DataFrame(clr, index=mi_r, columns=mi_c) diff --git a/inferelator/regression/sklearn_regression.py b/inferelator/regression/sklearn_regression.py index 47586710..92f81e6e 100644 --- a/inferelator/regression/sklearn_regression.py +++ b/inferelator/regression/sklearn_regression.py @@ -136,7 +136,7 @@ def run_bootstrap(self, bootstrap): x = self.design.get_bootstrap(bootstrap) y = self.response.get_bootstrap(bootstrap) utils.Debug.vprint('Calculating betas using SKLearn model {m}'.format(m=self._sklearn_model.__name__), level=0) - MPControl.sync_processes(pref="sk_pre") + return SKLearnRegression(x, y, self._sklearn_model, @@ -157,8 +157,6 @@ def run_bootstrap(self, bootstrap_idx): x = self._task_design[k].get_bootstrap(self._task_bootstraps[k][bootstrap_idx]) y = self._task_response[k].get_bootstrap(self._task_bootstraps[k][bootstrap_idx]) - MPControl.sync_processes(pref="sk_pre") - utils.Debug.vprint('Calculating task {k} using {n}'.format(k=k, n=self._sklearn_model.__name__), level=0) t_beta, t_br = SKLearnRegression(x, y, diff --git a/inferelator/regression/stability_selection.py b/inferelator/regression/stability_selection.py index b924a24c..ebb3fddf 100644 --- a/inferelator/regression/stability_selection.py +++ b/inferelator/regression/stability_selection.py @@ -229,15 +229,12 @@ def set_regression_parameters(self, alphas=None, num_subsamples=None, method=Non self.regress_method = method if method is not None else self.regress_method def run_regression(self): - MPControl.sync_processes("pre_stars") betas, resc_betas = StARS(self.design, self.response, self.random_seed, alphas=self.alphas, method=self.regress_method, num_subsamples=self.num_subsamples, parameters=self.sklearn_params).run() - MPControl.sync_processes("post_stars") - return [betas], [resc_betas] @@ -257,8 +254,6 @@ def run_bootstrap(self, bootstrap_idx): x = self._task_design[k].get_bootstrap(self._task_bootstraps[k][bootstrap_idx]) y = self._task_response[k].get_bootstrap(self._task_bootstraps[k][bootstrap_idx]) - MPControl.sync_processes(pref="sk_pre") - utils.Debug.vprint('Calculating task {k} betas using StARS'.format(k=k), level=0) t_beta, t_br = StARS(x, y, self.random_seed, alphas=self.alphas, method=self.regress_method, diff --git a/inferelator/tests/test_amusr.py b/inferelator/tests/test_amusr.py index b4795759..7fc58648 100644 --- a/inferelator/tests/test_amusr.py +++ b/inferelator/tests/test_amusr.py @@ -43,6 +43,11 @@ def test_sum_squared_errors(self): self.assertEqual(amusr_regression.sum_squared_errors(X, Y, W, 0), 0) self.assertEqual(amusr_regression.sum_squared_errors(X, Y, W, 1), 27) + +class TestAMuSRRegresionEBIC: + + use_numba = False + def test_amusr_regression(self): des = [np.array([[1, 1, 3], [0, 0, 2], [0, 0, 1]]).astype(float), np.array([[1, 1, 3], [0, 0, 2], [0, 0, 1]]).astype(float)] @@ -58,9 +63,9 @@ def test_amusr_regression(self): gene1_prior = amusr_regression.format_prior(priors, 'gene1', [0, 1], 1.) gene2_prior = amusr_regression.format_prior(priors, 'gene2', [0, 1], 1.) output = [amusr_regression.run_regression_EBIC(des, res, ['tf1', 'tf2', 'tf3'], [0, 1], 'gene1', gene1_prior, - scale_data=True), + scale_data=True, use_numba=self.use_numba), amusr_regression.run_regression_EBIC(des, res, ['tf1', 'tf2', 'tf3'], [0, 1], 'gene2', gene2_prior, - scale_data=True)] + scale_data=True, use_numba=self.use_numba)] out0 = pd.DataFrame([['tf3', 'gene1', -1, 1], ['tf3', 'gene1', -1, 1]], @@ -90,7 +95,7 @@ def test_unaligned_regression_genes(self): InferelatorData(pd.DataFrame(np.array([[1, 1], [2, 2], [3, 3]]).astype(float), columns=targets2))] priors = pd.DataFrame([[0, 1, 1], [1, 0, 1], [1, 0, 1]], index=targets, columns=tfs) - r = amusr_regression.AMuSR_regression(des, res, tfs=tfs, genes=targets, priors=priors) + r = amusr_regression.AMuSR_regression(des, res, tfs=tfs, genes=targets, priors=priors, use_numba=self.use_numba) out = [pd.DataFrame([['tf3', 'gene1', -1, 1], ['tf3', 'gene1', -1, 1]], index=pd.MultiIndex(levels=[[0, 1], [0]], codes=[[0, 1], [0, 0]]), @@ -108,6 +113,10 @@ def test_unaligned_regression_genes(self): weights, resc_weights = r.pileup_data(regress_data) +class TestAMuSRREgressionEBICNumba(TestAMuSRRegresionEBIC): + + use_numba = True + class TestAMuSRParams(unittest.TestCase): @@ -140,7 +149,7 @@ def test_lamb_b(self): priors=self.workflow.priors_data, lambda_Bs=lamb_b) def is_passed(X, Y, TFs, tasks, gene, prior, Cs=None, Ss=None, lambda_Bs=None, - lambda_Ss=None, scale_data=False, tol=None, rel_tol=None): + lambda_Ss=None, scale_data=False, **kwargs): npt.assert_array_equal(lambda_Bs, lamb_b) @@ -162,7 +171,7 @@ def test_lamb_s(self): priors=self.workflow.priors_data, lambda_Ss=lamb_s) def is_passed(X, Y, TFs, tasks, gene, prior, Cs=None, Ss=None, lambda_Bs=None, - lambda_Ss=None, scale_data=False, tol=None, rel_tol=None): + lambda_Ss=None, scale_data=False, **kwargs): npt.assert_array_equal(lambda_Ss, lamb_s) @@ -184,7 +193,7 @@ def test_heuristic_c(self): priors=self.workflow.priors_data, Cs=set_Cs) def is_passed(X, Y, TFs, tasks, gene, prior, Cs=None, Ss=None, lambda_Bs=None, - lambda_Ss=None, scale_data=False, tol=None, rel_tol=None): + lambda_Ss=None, scale_data=False, **kwargs): npt.assert_array_equal(set_Cs, Cs) diff --git a/inferelator/tests/test_base_regression.py b/inferelator/tests/test_base_regression.py index 85c63fc0..198e8d08 100644 --- a/inferelator/tests/test_base_regression.py +++ b/inferelator/tests/test_base_regression.py @@ -18,7 +18,7 @@ def test_recalculate_betas_from_selected_matrix_rank(self): # test that the matrix rank(A) = min(n,m) # dim(v) - rank(A) = null(A) = 0 x = np.array([[2, 4, 6], [4, 8, 12]]) - y = np.array([1, 1, 1]) + y = np.array([1, 1]) result = base_regression.recalculate_betas_from_selected(x, y, idx=None) np.testing.assert_array_almost_equal(result, np.array([0.0, 0.0, 0.0]), 2) diff --git a/inferelator/tests/test_mpcontrol.py b/inferelator/tests/test_mpcontrol.py index f445d229..a758450e 100644 --- a/inferelator/tests/test_mpcontrol.py +++ b/inferelator/tests/test_mpcontrol.py @@ -6,14 +6,6 @@ from inferelator.distributed.inferelator_mp import MPControl # Run tests only when the associated packages are installed -try: - from kvsstcp import kvsstcp - from inferelator.distributed import kvs_controller - - TEST_KVS = True -except ImportError: - TEST_KVS = False - try: from dask import distributed from inferelator.distributed import dask_local_controller @@ -79,10 +71,6 @@ def test_map(self): with self.assertRaises(RuntimeError): MPControl.map(math_function, *self.map_test_data) - def test_sync(self): - with self.assertRaises(RuntimeError): - MPControl.sync_processes() - def test_name(self): with self.assertRaises(NameError): MPControl.name() @@ -110,59 +98,9 @@ def test_local_map(self): test_result = MPControl.map(math_function, *self.map_test_data) self.assertListEqual(test_result, self.map_test_expect) - def test_local_sync(self): - self.assertTrue(MPControl.sync_processes()) - def test_local_name(self): self.assertEqual(MPControl.name(), self.name) - -@unittest.skipIf(not TEST_KVS, "KVS not installed") -class TestKVSMPController(TestMPControl): - server = None - name = "kvs" - temp_dir = None - - @classmethod - @unittest.skipIf(not TEST_KVS, "KVS not installed") - def setUpClass(cls): - cls.temp_dir = tempfile.mkdtemp() - cls.server = kvsstcp.KVSServer("", 0) - MPControl.shutdown() - MPControl.set_multiprocess_engine(cls.name) - MPControl.connect(host=cls.server.cinfo[0], port=cls.server.cinfo[1]) - - @classmethod - @unittest.skipIf(not TEST_KVS, "KVS not installed") - def tearDownClass(cls): - super(TestKVSMPController, cls).tearDownClass() - if cls.server is not None: - cls.server.shutdown() - if cls.temp_dir is not None: - shutil.rmtree(cls.temp_dir) - - def test_kvs_connect(self): - self.assertTrue(MPControl.is_initialized) - - def test_kvs_map(self): - test_result = MPControl.map(math_function, *self.map_test_data) - self.assertListEqual(test_result, self.map_test_expect) - - def test_kvs_map_distribute(self): - test_result = MPControl.map(math_function, *self.map_test_data, tell_children=True) - self.assertListEqual(test_result, self.map_test_expect) - - def test_kvs_map_by_file(self): - test_result = MPControl.map(math_function, *self.map_test_data, tell_children=True, tmp_file_path=self.temp_dir) - self.assertListEqual(test_result, self.map_test_expect) - - def test_kvs_sync(self): - self.assertEqual(MPControl.sync_processes(), None) - - def test_kvs_name(self): - self.assertEqual(MPControl.name(), self.name) - - @unittest.skipIf(not TEST_PATHOS, "Pathos not installed") class TestMultiprocessingMPController(TestMPControl): name = "multiprocessing" @@ -187,9 +125,6 @@ def test_mp_map(self): test_result = MPControl.map(math_function, *self.map_test_data) self.assertListEqual(test_result, self.map_test_expect) - def test_mp_sync(self): - self.assertTrue(MPControl.sync_processes()) - @unittest.skipIf(not TEST_DASK_LOCAL, "Dask not installed") class TestDaskLocalMPController(TestMPControl): @@ -222,9 +157,6 @@ def test_dask_local_name(self): def test_dask_local_map(self): pass - def test_dask_local_sync(self): - self.assertTrue(MPControl.sync_processes()) - @unittest.skipIf(not TEST_DASK_CLUSTER, "Dask not installed") class TestDaskHPCMPController(TestMPControl): @@ -281,9 +213,6 @@ def test_dask_cluster_name(self): def test_dask_cluster_map(self): pass - def test_dask_cluster_sync(self): - self.assertTrue(MPControl.sync_processes()) - def test_memory_0_hack(self): old_command = "dask-worker tcp://scheduler:port --memory-limit=4e9 --nthreads 1 --nprocs 20" new_command = "dask-worker tcp://scheduler:port --memory-limit 0 --nthreads 1 --nprocs 20" diff --git a/inferelator/tests/test_results_processor.py b/inferelator/tests/test_results_processor.py index 24725189..a05e9026 100644 --- a/inferelator/tests/test_results_processor.py +++ b/inferelator/tests/test_results_processor.py @@ -377,7 +377,7 @@ def test_mcc_perfect_inverse_prediction(self): def test_mcc_bad_prediction(self): gs = pd.DataFrame(np.array([[0, 1], [0, 1]]), ['gene1', 'gene2'], ['tf1', 'tf2']) - confidences = pd.DataFrame(np.array([[1, 0], [0, 0.5]]), ['gene1', 'gene2'], ['tf1', 'tf2']) + confidences = pd.DataFrame(np.array([[0, 0], [0, 0]]), ['gene1', 'gene2'], ['tf1', 'tf2']) mcc = self.metric([confidences, confidences], gs) np.testing.assert_approx_equal(mcc.score()[1], 0) diff --git a/inferelator/tests/test_workflow_base.py b/inferelator/tests/test_workflow_base.py index 7441e035..88a4dab7 100644 --- a/inferelator/tests/test_workflow_base.py +++ b/inferelator/tests/test_workflow_base.py @@ -430,9 +430,6 @@ def test_get_bootstraps(self): self.assertEqual(len(bootstraps), 5) self.assertListEqual(bootstraps[0], bootstrap_0) - def test_is_master(self): - self.assertTrue(self.workflow.is_master()) - def test_make_output_dir(self): temp_dir = tempfile.mkdtemp() self.workflow.input_dir = temp_dir diff --git a/inferelator/tfa_workflow.py b/inferelator/tfa_workflow.py index 29cc2345..2f36414e 100644 --- a/inferelator/tfa_workflow.py +++ b/inferelator/tfa_workflow.py @@ -156,7 +156,7 @@ def compute_activity(self): self.half_tau_response = None - if self._tfa_output_file is not None and self.is_master(): + if self._tfa_output_file is not None: self.create_output_dir() self.design.to_csv(self.output_path(self._tfa_output_file), sep="\t") @@ -196,14 +196,12 @@ def emit_results(self, betas, rescaled_betas, gold_standard, priors): """ Output result report(s) for workflow run. """ - if self.is_master(): - self.create_output_dir() - rp = self._result_processor_driver(betas, rescaled_betas, filter_method=self.gold_standard_filter_method, - metric=self.metric) - self.results = rp.summarize_network(self.output_dir, gold_standard, priors) - return self.results - else: - return None + + self.create_output_dir() + rp = self._result_processor_driver(betas, rescaled_betas, filter_method=self.gold_standard_filter_method, + metric=self.metric) + self.results = rp.summarize_network(self.output_dir, gold_standard, priors) + return self.results def compute_common_data(self): """ diff --git a/inferelator/utils/debug.py b/inferelator/utils/debug.py index 9a808fd4..ba7cf5b2 100644 --- a/inferelator/utils/debug.py +++ b/inferelator/utils/debug.py @@ -12,9 +12,6 @@ class Debug: verbose_level = 0 default_level = 1 - silence_clients = True - is_master = True - levels = dict(silent=-1, normal=0, verbose=1, v=1, @@ -30,8 +27,6 @@ def set_verbose_level(cls, lvl): @classmethod def vprint(cls, *args, **kwargs): - if cls.silence_clients and not cls.is_master: - return cls.print_level(*args, **kwargs) @classmethod diff --git a/inferelator/utils/loader.py b/inferelator/utils/loader.py index 0daeace9..740e8bba 100644 --- a/inferelator/utils/loader.py +++ b/inferelator/utils/loader.py @@ -43,9 +43,9 @@ def load_data_h5ad(self, h5ad_file, meta_data_file=None, meta_data_handler=DEFAU # Build an InferelatorData object from a layer elif use_layer is not None: - data = InferelatorData(data.layers[use_layer], - gene_names=data.var_names, - sample_names=data.obs_names, + data = InferelatorData(data.layers[use_layer].copy(), + gene_names=data.var_names.copy(), + sample_names=data.obs_names.copy(), meta_data=pd.concat((data.obs, meta_data), axis=1), gene_data=pd.concat((data.var, gene_metadata), axis=1)) diff --git a/inferelator/workflow.py b/inferelator/workflow.py index ca86031d..ab96936c 100644 --- a/inferelator/workflow.py +++ b/inferelator/workflow.py @@ -699,6 +699,9 @@ class WorkflowBase(WorkflowBaseLoader): # Use the Intel MKL libraries for matrix multiplication use_mkl = None + # Use numba for JIT + use_numba = False + # Multiprocessing controller initialize_mp = True multiprocessing_controller = None @@ -817,7 +820,7 @@ def set_output_file_names(network_file_name="", confidence_file_name="", nonzero if curve_data_file_name != "": InferelatorResults.curve_data_file_name = curve_data_file_name - def set_run_parameters(self, num_bootstraps=None, random_seed=None, use_mkl=None): + def set_run_parameters(self, num_bootstraps=None, random_seed=None, use_mkl=None, use_numba=None): """ Set parameters used during runtime @@ -827,11 +830,15 @@ def set_run_parameters(self, num_bootstraps=None, random_seed=None, use_mkl=None :type random_seed: int :param use_mkl: A flag to indicate if the intel MKL library should be used for matrix multiplication :type use_mkl: bool + :param use_numba: A flag to indicate if numba should be used to accelerate the calculations. + Requires numba to be installed if set. Currently only accelerates AMuSR regression. + :type use_numba: bool """ self._set_without_warning("num_bootstraps", num_bootstraps) self._set_without_warning("random_seed", random_seed) self._set_without_warning("use_mkl", use_mkl) + self._set_without_warning("use_numba", use_numba) def initialize_multiprocessing(self): """ @@ -958,13 +965,6 @@ def emit_results(self, betas, rescaled_betas, gold_standard, priors): """ raise NotImplementedError # implement in subclass - @staticmethod - def is_master(): - """ - Return True if this is the master thread - """ - return MPControl.is_master - def create_output_dir(self): """ Set a default output_dir if nothing is set. Create the path if it doesn't exist. diff --git a/requirements.txt b/requirements.txt index 23bbd1ef..93eaf8dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,4 @@ matplotlib >=1.5.1 scipy >=0.9 scikit-learn pandas >=0.24.0 -anndata +anndata >=0.7.4 \ No newline at end of file diff --git a/setup.py b/setup.py index 611a001b..046200ad 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages # Current Inferelator Version Number -version = "0.5.6" +version = "0.5.7" # Description from README.md