Skip to content

Commit

Permalink
Merge pull request #1 from infocusp/bug/sj/writing
Browse files Browse the repository at this point in the history
Optimzations + Bug fixes
  • Loading branch information
saurabhinfocusp authored Sep 16, 2024
2 parents e290da6 + 38dca86 commit 8007a7c
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 98 deletions.
42 changes: 24 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

# Single-cell analysis using Low Resource (scaLR)

<!-- [![Paper](https://img.shields.io/badge/Paper-insert_paper_id_here-white)]() -->
[![GitHub](https://img.shields.io/github/license/InFoCusp/scaLR)](https://github.com/infocusp/scaLR?tab=GPL-3.0-1-ov-file#)

## 📖 Overview

<b>scaLR</b> is a comprehensive end-to-end pipeline that is equipped with a range of advanced features to streamline and enhance the analysis of scRNA-seq data. The major steps of the platform are:

1. <b>Data Processing</b>: Large datasets undergo preprocessing and normalization (if the user opts to) and are segmented into training, testing, and validation sets.
Expand Down Expand Up @@ -40,8 +45,25 @@ pip install -r requirements.txt
- `adata.obs`: contains any metadata regarding cells, including a column for `target` which will be used for classification. The index of `adata.obs` is cell_barcodes.
- `adata.var`: contains all gene_names as Index.

## How to run

1. It is necessary that the user modify the configuration file and each stage of the pipeline is available inside the config folder [config.yml] or [full_config.yml] as per your requirements. Simply omit/comment out stages of the pipeline you do not wish to run.
2. Refer config.yml & it's detailed config [README](config_README.md) file on how to use different parameters and files.
3. Then use the `pipeline.py` file to run the entire pipeline according to your configurations. This file takes as argument the path to config (`-c | --config`), and an optional flag to log all parts of the pipelines (`-l | --log`).
4. `python pipeline.py --config /path/to/config -c config.yaml -l` to run the scaLR.


## Interactive tutorials
Detailed tutorials have been made on how to use some functionalities as a scaLR library. Find the links below.

- [Normalization](tutorials/preprocessing/normalization.ipynb)
- [Batch correction](tutorials/preprocessing/batchc_correction.ipynb)
- [Gene recall curve](tutorials/analysis/gene_recall_curve/gene_recall_curve.ipynb)
- [Differential gene expression analysis](tutorials/analysis/differential_gene_expression/dge.ipynb)
- [SHAP analysis](tutorials/analysis/shap_analysis/shap_heatmap.ipynb)

## Output Structure
## Experiment Output Structure
- **pipeline.py**:
The main script that perform end to end run.
- `exp_dir`: root experiment directory for the storage of all step outputs of the platform specified in the config.
Expand Down Expand Up @@ -94,23 +116,7 @@ Performs evaluation of best model trained on user-defined metrics on the test se
- `lmem_dge_result`
- `lmem_DGE_celltype.csv`: contains LMEM DGE results between selected factor categories for a celltype.
- `lmem_DGE_fixed_effect_factor_X.svg`: volcano plot of coefficient vs -log10(p-value) of genes.

## How to run

1. It is necessary that the user modify the configuration file and each stage of the pipeline is available inside the config folder [config.yml] or [full_config.yml] as per your requirements. Simply omit/comment out stages of the pipeline you do not wish to run.
2. Refer config.yml & it's detailed config [README](config_README.md) file on how to use different parameters and files.
3. Then use the `pipeline.py` file to run the entire pipeline according to your configurations. This file takes as argument the path to config (`-c | --config`), and an optional flag to log all parts of the pipelines (`-l | --log`).
4. `python pipeline.py --config /path/to/config -c config.yaml -l` to run the scaLR.


## Interactive tutorials
Detailed tutorials have been made on how to use some functionalities as a scaLR library. Find the links below.

- Normalization - `tutorials/preprocessing/normalization.ipynb`
- Batch correction - `tutorials/preprocessing/batchc_correction.ipynb`
- Gene recall curve - `tutorials/analysis/gene_recall_curve/gene_recall_curve.ipynb`
- Differential gene expression analysis - `tutorials/analysis/differential_gene_expression/dge.ipynb`
- SHAP analysis - `tutorials/analysis/shap_analysis/shap_heatmap.ipynb`


<center >
<b>scaLR © 2024 Infocusp Innovations</b>
Expand Down
67 changes: 24 additions & 43 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ device: 'cuda'

# EXPERIMENT.
experiment:
dirpath: 'revamped_scalr_experiments'
exp_name: 'final'
exp_run: 1
dirpath: 'scalr_experiments'
exp_name: 'final_5000_6'
exp_run: 0


# DATA CONFIG.
Expand Down Expand Up @@ -40,13 +40,13 @@ data:
# FEATURE SELECTION.
feature_selection:

# scores: '/path/to/matrix'
feature_subsetsize: 6000
# score_matrix: '/path/to/matrix'
feature_subsetsize: 5000

model:
name: SequentialModel
params:
layers: [6000, 6]
layers: [5000, 6]
weights_init_zero: True

model_train_config:
Expand All @@ -56,7 +56,7 @@ feature_selection:
name: SimpleDataLoader
params:
batch_size: 25000
padding: 6000
padding: 5000

optimizer:
name: SGD
Expand Down Expand Up @@ -139,39 +139,20 @@ analysis:
params:
k: 100

downstream_analysis:
- name: GeneRecallCurve
params:
reference_genes_path: '/path/to/reference_genes.csv'
top_K: 300
plots_per_row: 3
features_selector:
name: ClasswiseAbs
params: {}
- name: Heatmap
params:
top_n_genes: 100
- name: RocAucCurve
params: {}
- name: DgePseudoBulk
params:
celltype_column: 'cell_type'
design_factor: 'disease'
factor_categories: ['Alzheimer disease', 'normal']
sum_column: 'donor_id'
cell_subsets: ['excitatory neuron', 'inhibitory interneuron', 'oligodendrocyte']
min_cell_threshold: 1
fold_change: 1.5
p_val: 0.05
save_plot: True
- name: DgeLMEM
params:
fixed_effect_column: 'disease'
fixed_effect_factors: ['Alzheimer disease', 'normal']
group: 'donor_id'
min_cell_threshold: 10
n_cpu: 4
gene_batch_size: 1000
coef_threshold: 0
p_val: 0.05
save_plot: True
# downstream_analysis:
# - name: GeneRecallCurve
# params:
# reference_genes_path: '/path/to/reference_genes.csv'
# top_K: 300
# plots_per_row: 3
# features_selector:
# name: ClasswiseAbs
# params: {}
# - name: Heatmap
# params: **kwargs
# - name: RocAucCurve
# params: **kwargs
# - name: DgePseudoBulk
# params: **kwargs
# - name: DgeLMEM
# params: **kwargs
7 changes: 4 additions & 3 deletions scalr/data/preprocess/_preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ def fit(
"""
pass

def process_data(self, datapath: dict, sample_chunksize: int, dirpath: str):
def process_data(self, full_data: Union[AnnData, AnnCollection],
sample_chunksize: int, dirpath: str):
"""A function to process the entire data chunkwise and write the processed data
to disk.
Args:
datapath (str): Path to read the data from for transformation.
full_data (Union[AnnData, AnnCollection]): Full data for transformation.
sample_chunksize (int): Number of samples in one chunk.
dirpath (str): Path to write the data to.
"""
Expand All @@ -64,7 +65,7 @@ def process_data(self, datapath: dict, sample_chunksize: int, dirpath: str):
raise NotImplementedError(
'Preprocessing does not work without sample chunk size')

write_chunkwise_data(datapath,
write_chunkwise_data(full_data,
sample_chunksize,
dirpath,
transform=self.transform)
Expand Down
16 changes: 10 additions & 6 deletions scalr/data/split/_split.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

import os
from os import path
from typing import Union

from anndata import AnnData
from anndata.experimental import AnnCollection

import scalr
from scalr.utils import build_object
Expand Down Expand Up @@ -88,12 +92,13 @@ def check_splits(self, datapath: str, data_splits: dict, target: str):
self.event_logger.info(
f'{metadata[target].iloc[test_inds].value_counts()}\n')

def write_splits(self, full_datapath: str, data_split_indices: dict,
sample_chunksize: int, dirpath: int):
def write_splits(self, full_data: Union[AnnData, AnnCollection],
data_split_indices: dict, sample_chunksize: int,
dirpath: int):
"""THis function writes the train validation and test splits to the disk.
Args:
full_datapath (str): Full datapath of data to be split.
full_data (Union[AnnData, AnnCollection]): Full data to be split.
data_split_indices (dict): Indices of each split.
sample_chunksize (int): Number of samples to be written in one file.
dirpath (int): Path to write data into.
Expand All @@ -106,10 +111,9 @@ def write_splits(self, full_datapath: str, data_split_indices: dict,
if sample_chunksize:
split_dirpath = path.join(dirpath, split)
os.makedirs(split_dirpath, exist_ok=True)
write_chunkwise_data(full_datapath, sample_chunksize,
split_dirpath, data_split_indices[split])
write_chunkwise_data(full_data, sample_chunksize, split_dirpath,
data_split_indices[split])
else:
full_data = read_data(full_datapath)
filepath = path.join(dirpath, f'{split}.h5ad')
write_data(full_data[data_split_indices[split]], filepath)

Expand Down
29 changes: 19 additions & 10 deletions scalr/data_ingestion_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import os
from os import path

import pandas as pd

from scalr.data.preprocess import build_preprocessor
from scalr.data.split import build_splitter
from scalr.utils import FlowLogger
Expand Down Expand Up @@ -51,6 +53,7 @@ def generate_train_val_test_split(self):
''')

full_datapath = self.data_config['train_val_test']['full_datapath']
self.full_data = read_data(full_datapath)
splitter_config = deepcopy(
self.data_config['train_val_test']['splitter_config'])
splitter, splitter_config = build_splitter(splitter_config)
Expand All @@ -74,10 +77,13 @@ def generate_train_val_test_split(self):
'train_val_test_split')
os.makedirs(train_val_test_split_dirpath, exist_ok=True)

splitter.write_splits(full_datapath, train_val_test_split_indices,
splitter.write_splits(self.full_data, train_val_test_split_indices,
self.sample_chunksize,
train_val_test_split_dirpath)

# Garbage collection
del self.full_data

self.data_config['train_val_test'][
'split_datapaths'] = train_val_test_split_dirpath

Expand Down Expand Up @@ -117,8 +123,8 @@ def preprocess_data(self):
self.sample_chunksize)
# Transform on train, val & test split.
for split in ['train', 'val', 'test']:
preprocessor.process_data(path.join(datapath, split),
self.sample_chunksize,
split_data = read_data(path.join(datapath, split))
preprocessor.process_data(split_data, self.sample_chunksize,
path.join(processed_datapath, split))

datapath = processed_datapath
Expand All @@ -136,26 +142,29 @@ def generate_mappings(self):
path.join(self.data_config['train_val_test']['final_datapaths'],
'val')).obs.columns

datas = []
data_obs = []
for split in ['train', 'val', 'test']:
datapath = path.join(
self.data_config['train_val_test']['final_datapaths'], split)
datas.append(read_data(datapath))
split_data_obs = read_data(datapath).obs
data_obs.append(split_data_obs)
full_data_obs = pd.concat(data_obs)

label_mappings = {}
for column_name in column_names:
label_mappings[column_name] = {}

id2label = []
for data in datas:
id2label += data.obs[column_name].astype(
'category').cat.categories.tolist()
id2label = sorted(full_data_obs[column_name].astype(
'category').cat.categories.tolist())

id2label = sorted(list(set(id2label)))
label2id = {id2label[i]: i for i in range(len(id2label))}
label_mappings[column_name]['id2label'] = id2label
label_mappings[column_name]['label2id'] = label2id

# Garbage collection
del data_obs
del full_data_obs

write_data(label_mappings, path.join(self.datadir,
'label_mappings.json'))

Expand Down
12 changes: 9 additions & 3 deletions scalr/feature_extraction_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,19 @@ def write_top_features_subset_data(self, data_config: dict) -> dict:
feature_subset_datapath = path.join(self.dirpath, 'feature_subset_data')
os.makedirs(feature_subset_datapath, exist_ok=True)

for split in ['train', 'val', 'test']:
test_data = read_data(path.join(datapath, 'test'))
splits = {
'train': self.train_data,
'val': self.val_data,
'test': test_data
}

for split, split_data in splits.items():

split_datapath = path.join(datapath, split)
split_feature_subset_datapath = path.join(feature_subset_datapath,
split)
sample_chunksize = data_config.get('sample_chunksize')
write_chunkwise_data(split_datapath,
write_chunkwise_data(split_data,
sample_chunksize,
split_feature_subset_datapath,
feature_inds=self.top_features)
Expand Down
27 changes: 13 additions & 14 deletions scalr/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def write_data(data: Union[dict, AnnData, pd.DataFrame], filepath: str):
'`filepath` does not contain `json`, `yaml`, or `h5ad` file')


def write_chunkwise_data(datapath: str,
def write_chunkwise_data(full_data: Union[AnnData, AnnCollection],
sample_chunksize: int,
dirpath: str,
sample_inds: list[int] = None,
Expand All @@ -81,7 +81,7 @@ def write_chunkwise_data(datapath: str,
This function can also apply transformation on each chunk.
Args:
datapath (str): path/to/data to be written in chunks.
full_data (Union[AnnData, AnnCollection]): data to be written in chunks.
sample_chunksize (int): number of samples to be loaded at a time.
dirpath (str): path/to/directory to write the chunks of data.
sample_inds (list[int], optional): To be used in case of chunking
Expand All @@ -95,26 +95,25 @@ def write_chunkwise_data(datapath: str,
if not path.exists(dirpath):
os.makedirs(dirpath)

data = read_data(datapath)
if isinstance(data, AnnData) and feature_inds:
raise ValueError(
'TrainValTestSplit data for FeatureSubsetting must be AnnCollection'
)

if not sample_inds:
sample_inds = list(range(len(data)))
sample_inds = list(range(len(full_data)))

# Hacky fix for an AnnCollection working/bug.
# Hacky fixes for an AnnCollection working/bug.
if sample_chunksize >= len(sample_inds):
sample_chunksize = len(sample_inds) - 1

for col in full_data.obs.columns:
full_data.obs[col] = full_data.obs[col].astype('category')

for i, (start) in enumerate(range(0, len(sample_inds), sample_chunksize)):
data = read_data(datapath)
data = data[sample_inds[start:start + sample_chunksize]]
if feature_inds:
data = full_data[sample_inds[start:start + sample_chunksize],
feature_inds]
else:
data = full_data[sample_inds[start:start + sample_chunksize]]

if not isinstance(data, AnnData):
data = data.to_adata()
if feature_inds:
data = data[:, feature_inds]
data = data.to_memory()

# Transformation
Expand Down
3 changes: 2 additions & 1 deletion scalr/utils/test_file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ def test_write_chunkwise_data():
dirpath = './tmp/chunked_data/'

# Writing fulldata in chunks.
write_chunkwise_data(fulldata_path,
full_data = read_data(fulldata_path)
write_chunkwise_data(full_data,
sample_chunksize=sample_chunksize,
dirpath=dirpath)

Expand Down

0 comments on commit 8007a7c

Please sign in to comment.