From fb361b0e1d6f4dbedf1de0c437ad2f8c89d60658 Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Thu, 12 Sep 2024 16:39:10 +0530 Subject: [PATCH 1/4] chunkwise_writing made simpler, old pipeline replica --- scalr/utils/file_utils.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/scalr/utils/file_utils.py b/scalr/utils/file_utils.py index d1a418d..921b98f 100644 --- a/scalr/utils/file_utils.py +++ b/scalr/utils/file_utils.py @@ -110,11 +110,15 @@ def write_chunkwise_data(datapath: str, for i, (start) in enumerate(range(0, len(sample_inds), sample_chunksize)): data = read_data(datapath) - data = data[sample_inds[start:start + sample_chunksize]] + + if feature_inds: + data = data[sample_inds[start:start + sample_chunksize], + feature_inds] + else: + data = data[sample_inds[start:start + sample_chunksize]] + if not isinstance(data, AnnData): data = data.to_adata() - if feature_inds: - data = data[:, feature_inds] data = data.to_memory() # Transformation From 1390a7d8bf903af6d2751b1d50955f300a02ab0a Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Fri, 13 Sep 2024 18:00:08 +0530 Subject: [PATCH 2/4] optimized chunkwise writing + garbage collection + handling NaN writing bug --- scalr/data/preprocess/_preprocess.py | 7 ++++--- scalr/data/split/_split.py | 16 ++++++++++------ scalr/data_ingestion_pipeline.py | 24 ++++++++++++++++-------- scalr/feature_extraction_pipeline.py | 12 +++++++++--- scalr/utils/file_utils.py | 25 ++++++++++--------------- scalr/utils/test_file_utils.py | 2 +- 6 files changed, 50 insertions(+), 36 deletions(-) diff --git a/scalr/data/preprocess/_preprocess.py b/scalr/data/preprocess/_preprocess.py index b53594c..68a60a2 100644 --- a/scalr/data/preprocess/_preprocess.py +++ b/scalr/data/preprocess/_preprocess.py @@ -50,12 +50,13 @@ def fit( """ pass - def process_data(self, datapath: dict, sample_chunksize: int, dirpath: str): + def process_data(self, full_data: Union[AnnData, AnnCollection], + sample_chunksize: int, dirpath: str): """A function to process the entire data chunkwise and write the processed data to disk. Args: - datapath (str): Path to read the data from for transformation. + full_data (Union[AnnData, AnnCollection]): Full data for transformation. sample_chunksize (int): Number of samples in one chunk. dirpath (str): Path to write the data to. """ @@ -64,7 +65,7 @@ def process_data(self, datapath: dict, sample_chunksize: int, dirpath: str): raise NotImplementedError( 'Preprocessing does not work without sample chunk size') - write_chunkwise_data(datapath, + write_chunkwise_data(full_data, sample_chunksize, dirpath, transform=self.transform) diff --git a/scalr/data/split/_split.py b/scalr/data/split/_split.py index fe35c75..f2ffe14 100644 --- a/scalr/data/split/_split.py +++ b/scalr/data/split/_split.py @@ -2,6 +2,10 @@ import os from os import path +from typing import Union + +from anndata import AnnData +from anndata.experimental import AnnCollection import scalr from scalr.utils import build_object @@ -88,12 +92,13 @@ def check_splits(self, datapath: str, data_splits: dict, target: str): self.event_logger.info( f'{metadata[target].iloc[test_inds].value_counts()}\n') - def write_splits(self, full_datapath: str, data_split_indices: dict, - sample_chunksize: int, dirpath: int): + def write_splits(self, full_data: Union[AnnData, AnnCollection], + data_split_indices: dict, sample_chunksize: int, + dirpath: int): """THis function writes the train validation and test splits to the disk. Args: - full_datapath (str): Full datapath of data to be split. + full_data (Union[AnnData, AnnCollection]): Full data to be split. data_split_indices (dict): Indices of each split. sample_chunksize (int): Number of samples to be written in one file. dirpath (int): Path to write data into. @@ -106,10 +111,9 @@ def write_splits(self, full_datapath: str, data_split_indices: dict, if sample_chunksize: split_dirpath = path.join(dirpath, split) os.makedirs(split_dirpath, exist_ok=True) - write_chunkwise_data(full_datapath, sample_chunksize, - split_dirpath, data_split_indices[split]) + write_chunkwise_data(full_data, sample_chunksize, split_dirpath, + data_split_indices[split]) else: - full_data = read_data(full_datapath) filepath = path.join(dirpath, f'{split}.h5ad') write_data(full_data[data_split_indices[split]], filepath) diff --git a/scalr/data_ingestion_pipeline.py b/scalr/data_ingestion_pipeline.py index 0e087c7..ee6c23e 100644 --- a/scalr/data_ingestion_pipeline.py +++ b/scalr/data_ingestion_pipeline.py @@ -4,6 +4,8 @@ import os from os import path +import pandas as pd + from scalr.data.preprocess import build_preprocessor from scalr.data.split import build_splitter from scalr.utils import FlowLogger @@ -51,6 +53,7 @@ def generate_train_val_test_split(self): ''') full_datapath = self.data_config['train_val_test']['full_datapath'] + self.full_data = read_data(full_datapath) splitter_config = deepcopy( self.data_config['train_val_test']['splitter_config']) splitter, splitter_config = build_splitter(splitter_config) @@ -74,10 +77,13 @@ def generate_train_val_test_split(self): 'train_val_test_split') os.makedirs(train_val_test_split_dirpath, exist_ok=True) - splitter.write_splits(full_datapath, train_val_test_split_indices, + splitter.write_splits(self.full_data, train_val_test_split_indices, self.sample_chunksize, train_val_test_split_dirpath) + # Garbage collection + del self.full_data + self.data_config['train_val_test'][ 'split_datapaths'] = train_val_test_split_dirpath @@ -117,7 +123,7 @@ def preprocess_data(self): self.sample_chunksize) # Transform on train, val & test split. for split in ['train', 'val', 'test']: - preprocessor.process_data(path.join(datapath, split), + preprocessor.process_data(read_data(path.join(datapath, split)), self.sample_chunksize, path.join(processed_datapath, split)) @@ -140,22 +146,24 @@ def generate_mappings(self): for split in ['train', 'val', 'test']: datapath = path.join( self.data_config['train_val_test']['final_datapaths'], split) - datas.append(read_data(datapath)) + datas.append(read_data(datapath).obs) + data = pd.concat(datas) label_mappings = {} for column_name in column_names: label_mappings[column_name] = {} - id2label = [] - for data in datas: - id2label += data.obs[column_name].astype( - 'category').cat.categories.tolist() + id2label = sorted( + data[column_name].astype('category').cat.categories.tolist()) - id2label = sorted(list(set(id2label))) label2id = {id2label[i]: i for i in range(len(id2label))} label_mappings[column_name]['id2label'] = id2label label_mappings[column_name]['label2id'] = label2id + # Garbage collection + del datas + del data + write_data(label_mappings, path.join(self.datadir, 'label_mappings.json')) diff --git a/scalr/feature_extraction_pipeline.py b/scalr/feature_extraction_pipeline.py index 91fa4c6..fa0a118 100644 --- a/scalr/feature_extraction_pipeline.py +++ b/scalr/feature_extraction_pipeline.py @@ -179,13 +179,19 @@ def write_top_features_subset_data(self, data_config: dict) -> dict: feature_subset_datapath = path.join(self.dirpath, 'feature_subset_data') os.makedirs(feature_subset_datapath, exist_ok=True) - for split in ['train', 'val', 'test']: + test_data = read_data(path.join(datapath, 'test')) + splits = { + 'train': self.train_data, + 'val': self.val_data, + 'test': test_data + } + + for split, split_data in splits.items(): - split_datapath = path.join(datapath, split) split_feature_subset_datapath = path.join(feature_subset_datapath, split) sample_chunksize = data_config.get('sample_chunksize') - write_chunkwise_data(split_datapath, + write_chunkwise_data(split_data, sample_chunksize, split_feature_subset_datapath, feature_inds=self.top_features) diff --git a/scalr/utils/file_utils.py b/scalr/utils/file_utils.py index 921b98f..926bc96 100644 --- a/scalr/utils/file_utils.py +++ b/scalr/utils/file_utils.py @@ -69,7 +69,7 @@ def write_data(data: Union[dict, AnnData, pd.DataFrame], filepath: str): '`filepath` does not contain `json`, `yaml`, or `h5ad` file') -def write_chunkwise_data(datapath: str, +def write_chunkwise_data(full_data: Union[AnnData, AnnCollection], sample_chunksize: int, dirpath: str, sample_inds: list[int] = None, @@ -81,7 +81,7 @@ def write_chunkwise_data(datapath: str, This function can also apply transformation on each chunk. Args: - datapath (str): path/to/data to be written in chunks. + full_data (Union[AnnData, AnnCollection]): data to be written in chunks. sample_chunksize (int): number of samples to be loaded at a time. dirpath (str): path/to/directory to write the chunks of data. sample_inds (list[int], optional): To be used in case of chunking @@ -95,27 +95,22 @@ def write_chunkwise_data(datapath: str, if not path.exists(dirpath): os.makedirs(dirpath) - data = read_data(datapath) - if isinstance(data, AnnData) and feature_inds: - raise ValueError( - 'TrainValTestSplit data for FeatureSubsetting must be AnnCollection' - ) - if not sample_inds: - sample_inds = list(range(len(data))) + sample_inds = list(range(len(full_data))) - # Hacky fix for an AnnCollection working/bug. + # Hacky fixes for an AnnCollection working/bug. if sample_chunksize >= len(sample_inds): sample_chunksize = len(sample_inds) - 1 - for i, (start) in enumerate(range(0, len(sample_inds), sample_chunksize)): - data = read_data(datapath) + for col in full_data.obs.columns: + full_data.obs[col] = full_data.obs[col].astype('category') + for i, (start) in enumerate(range(0, len(sample_inds), sample_chunksize)): if feature_inds: - data = data[sample_inds[start:start + sample_chunksize], - feature_inds] + data = full_data[sample_inds[start:start + sample_chunksize], + feature_inds] else: - data = data[sample_inds[start:start + sample_chunksize]] + data = full_data[sample_inds[start:start + sample_chunksize]] if not isinstance(data, AnnData): data = data.to_adata() diff --git a/scalr/utils/test_file_utils.py b/scalr/utils/test_file_utils.py index 2e70051..45fb397 100644 --- a/scalr/utils/test_file_utils.py +++ b/scalr/utils/test_file_utils.py @@ -31,7 +31,7 @@ def test_write_chunkwise_data(): dirpath = './tmp/chunked_data/' # Writing fulldata in chunks. - write_chunkwise_data(fulldata_path, + write_chunkwise_data(read_data(fulldata_path), sample_chunksize=sample_chunksize, dirpath=dirpath) From 9606fbecbb8e61d49b74e25a3541689a573cc4a8 Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Fri, 13 Sep 2024 18:16:59 +0530 Subject: [PATCH 3/4] README + config updated --- README.md | 42 +++++++++++++++++++-------------- config.yaml | 67 +++++++++++++++++++---------------------------------- 2 files changed, 48 insertions(+), 61 deletions(-) diff --git a/README.md b/README.md index 2c157c6..aac58c6 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,11 @@ # Single-cell analysis using Low Resource (scaLR) + +[![GitHub](https://img.shields.io/github/license/InFoCusp/scaLR)](https://github.com/infocusp/scaLR?tab=GPL-3.0-1-ov-file#) + +## 📖 Overview + scaLR is a comprehensive end-to-end pipeline that is equipped with a range of advanced features to streamline and enhance the analysis of scRNA-seq data. The major steps of the platform are: 1. Data Processing: Large datasets undergo preprocessing and normalization (if the user opts to) and are segmented into training, testing, and validation sets. @@ -40,8 +45,25 @@ pip install -r requirements.txt - `adata.obs`: contains any metadata regarding cells, including a column for `target` which will be used for classification. The index of `adata.obs` is cell_barcodes. - `adata.var`: contains all gene_names as Index. + +## How to run + +1. It is necessary that the user modify the configuration file and each stage of the pipeline is available inside the config folder [config.yml] or [full_config.yml] as per your requirements. Simply omit/comment out stages of the pipeline you do not wish to run. +2. Refer config.yml & it's detailed config [README](config_README.md) file on how to use different parameters and files. +3. Then use the `pipeline.py` file to run the entire pipeline according to your configurations. This file takes as argument the path to config (`-c | --config`), and an optional flag to log all parts of the pipelines (`-l | --log`). +4. `python pipeline.py --config /path/to/config -c config.yaml -l` to run the scaLR. + + +## Interactive tutorials +Detailed tutorials have been made on how to use some functionalities as a scaLR library. Find the links below. + +- [Normalization](tutorials/preprocessing/normalization.ipynb) +- [Batch correction](tutorials/preprocessing/batchc_correction.ipynb) +- [Gene recall curve](tutorials/analysis/gene_recall_curve/gene_recall_curve.ipynb) +- [Differential gene expression analysis](tutorials/analysis/differential_gene_expression/dge.ipynb) +- [SHAP analysis](tutorials/analysis/shap_analysis/shap_heatmap.ipynb) -## Output Structure +## Experiment Output Structure - **pipeline.py**: The main script that perform end to end run. - `exp_dir`: root experiment directory for the storage of all step outputs of the platform specified in the config. @@ -94,23 +116,7 @@ Performs evaluation of best model trained on user-defined metrics on the test se - `lmem_dge_result` - `lmem_DGE_celltype.csv`: contains LMEM DGE results between selected factor categories for a celltype. - `lmem_DGE_fixed_effect_factor_X.svg`: volcano plot of coefficient vs -log10(p-value) of genes. - -## How to run - -1. It is necessary that the user modify the configuration file and each stage of the pipeline is available inside the config folder [config.yml] or [full_config.yml] as per your requirements. Simply omit/comment out stages of the pipeline you do not wish to run. -2. Refer config.yml & it's detailed config [README](config_README.md) file on how to use different parameters and files. -3. Then use the `pipeline.py` file to run the entire pipeline according to your configurations. This file takes as argument the path to config (`-c | --config`), and an optional flag to log all parts of the pipelines (`-l | --log`). -4. `python pipeline.py --config /path/to/config -c config.yaml -l` to run the scaLR. - - -## Interactive tutorials -Detailed tutorials have been made on how to use some functionalities as a scaLR library. Find the links below. - -- Normalization - `tutorials/preprocessing/normalization.ipynb` -- Batch correction - `tutorials/preprocessing/batchc_correction.ipynb` -- Gene recall curve - `tutorials/analysis/gene_recall_curve/gene_recall_curve.ipynb` -- Differential gene expression analysis - `tutorials/analysis/differential_gene_expression/dge.ipynb` -- SHAP analysis - `tutorials/analysis/shap_analysis/shap_heatmap.ipynb` +
scaLR © 2024 Infocusp Innovations diff --git a/config.yaml b/config.yaml index 6958df6..c1a5c17 100644 --- a/config.yaml +++ b/config.yaml @@ -5,9 +5,9 @@ device: 'cuda' # EXPERIMENT. experiment: - dirpath: 'revamped_scalr_experiments' - exp_name: 'final' - exp_run: 1 + dirpath: 'scalr_experiments' + exp_name: 'final_5000_6' + exp_run: 0 # DATA CONFIG. @@ -40,13 +40,13 @@ data: # FEATURE SELECTION. feature_selection: - # scores: '/path/to/matrix' - feature_subsetsize: 6000 + # score_matrix: '/path/to/matrix' + feature_subsetsize: 5000 model: name: SequentialModel params: - layers: [6000, 6] + layers: [5000, 6] weights_init_zero: True model_train_config: @@ -56,7 +56,7 @@ feature_selection: name: SimpleDataLoader params: batch_size: 25000 - padding: 6000 + padding: 5000 optimizer: name: SGD @@ -139,39 +139,20 @@ analysis: params: k: 100 - downstream_analysis: - - name: GeneRecallCurve - params: - reference_genes_path: '/path/to/reference_genes.csv' - top_K: 300 - plots_per_row: 3 - features_selector: - name: ClasswiseAbs - params: {} - - name: Heatmap - params: - top_n_genes: 100 - - name: RocAucCurve - params: {} - - name: DgePseudoBulk - params: - celltype_column: 'cell_type' - design_factor: 'disease' - factor_categories: ['Alzheimer disease', 'normal'] - sum_column: 'donor_id' - cell_subsets: ['excitatory neuron', 'inhibitory interneuron', 'oligodendrocyte'] - min_cell_threshold: 1 - fold_change: 1.5 - p_val: 0.05 - save_plot: True - - name: DgeLMEM - params: - fixed_effect_column: 'disease' - fixed_effect_factors: ['Alzheimer disease', 'normal'] - group: 'donor_id' - min_cell_threshold: 10 - n_cpu: 4 - gene_batch_size: 1000 - coef_threshold: 0 - p_val: 0.05 - save_plot: True \ No newline at end of file + # downstream_analysis: + # - name: GeneRecallCurve + # params: + # reference_genes_path: '/path/to/reference_genes.csv' + # top_K: 300 + # plots_per_row: 3 + # features_selector: + # name: ClasswiseAbs + # params: {} + # - name: Heatmap + # params: **kwargs + # - name: RocAucCurve + # params: **kwargs + # - name: DgePseudoBulk + # params: **kwargs + # - name: DgeLMEM + # params: **kwargs \ No newline at end of file From 38dca865a7d07b014ba545d17b3af4fb6c2f2702 Mon Sep 17 00:00:00 2001 From: Saiyam26 Date: Mon, 16 Sep 2024 13:07:47 +0530 Subject: [PATCH 4/4] comments addresed --- scalr/data_ingestion_pipeline.py | 19 ++++++++++--------- scalr/utils/test_file_utils.py | 3 ++- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/scalr/data_ingestion_pipeline.py b/scalr/data_ingestion_pipeline.py index ee6c23e..2aad9dd 100644 --- a/scalr/data_ingestion_pipeline.py +++ b/scalr/data_ingestion_pipeline.py @@ -123,8 +123,8 @@ def preprocess_data(self): self.sample_chunksize) # Transform on train, val & test split. for split in ['train', 'val', 'test']: - preprocessor.process_data(read_data(path.join(datapath, split)), - self.sample_chunksize, + split_data = read_data(path.join(datapath, split)) + preprocessor.process_data(split_data, self.sample_chunksize, path.join(processed_datapath, split)) datapath = processed_datapath @@ -142,27 +142,28 @@ def generate_mappings(self): path.join(self.data_config['train_val_test']['final_datapaths'], 'val')).obs.columns - datas = [] + data_obs = [] for split in ['train', 'val', 'test']: datapath = path.join( self.data_config['train_val_test']['final_datapaths'], split) - datas.append(read_data(datapath).obs) - data = pd.concat(datas) + split_data_obs = read_data(datapath).obs + data_obs.append(split_data_obs) + full_data_obs = pd.concat(data_obs) label_mappings = {} for column_name in column_names: label_mappings[column_name] = {} - id2label = sorted( - data[column_name].astype('category').cat.categories.tolist()) + id2label = sorted(full_data_obs[column_name].astype( + 'category').cat.categories.tolist()) label2id = {id2label[i]: i for i in range(len(id2label))} label_mappings[column_name]['id2label'] = id2label label_mappings[column_name]['label2id'] = label2id # Garbage collection - del datas - del data + del data_obs + del full_data_obs write_data(label_mappings, path.join(self.datadir, 'label_mappings.json')) diff --git a/scalr/utils/test_file_utils.py b/scalr/utils/test_file_utils.py index 45fb397..3fa8dd6 100644 --- a/scalr/utils/test_file_utils.py +++ b/scalr/utils/test_file_utils.py @@ -31,7 +31,8 @@ def test_write_chunkwise_data(): dirpath = './tmp/chunked_data/' # Writing fulldata in chunks. - write_chunkwise_data(read_data(fulldata_path), + full_data = read_data(fulldata_path) + write_chunkwise_data(full_data, sample_chunksize=sample_chunksize, dirpath=dirpath)