Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup pipeline, add documentation #41

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,41 +1,50 @@
# scvi-v2-reproducibility

## Running existing workflows

Existing workflows can be discovered under `workflows/`. These can be run as follows from
the root of the repository:

```
nextflow main.nf --workflow workflow_name --profile profile_name
```
Setting both ``--workflow`` and ``--profile`` is required. Available profiles include
`standard` (run without GPU) and `gpu` (run with GPU).

Setting both `--workflow` and `--profile` is required. Available profiles include
`standard` (run without GPU) and `gpu` (run with GPU).

By default, intermediate and final outputs are placed in `results/`. This can be changed
by modifying the `publishDir` directive in the configuration.

### Simple pipeline

Currently, this pipeline just scans `data/` in the root directory for any `.h5ad` files
and runs each subworkflow sequentially. It expects a configuration JSON file to be
and runs each subworkflow sequentially. It expects a configuration JSON file to be
present in `conf/datasets/` with the same name as the `.h5ad` file.

### AWS pipeline

This pipeline pulls data from `s3://largedonor` and runs each subworkflow locally. In
order to run this pipeline, create the file `conf/aws_credentials.config` (ignored by
git) with the following entries:

```
aws {
accessKey = "access_key"
secretKey = "secret_key"
region = "us-west-1"
}
```

You can specify the individual AnnDatas being processed by modifying `params.inputs` in
`conf/aws_pipeline.config`.

## Adding new workflows
Workflows are intended to connect subworkflows into an end-to-end pipeline. In order to

Workflows are intended to connect subworkflows into an end-to-end pipeline. In order to
add a new workflow, follow these steps:

1. Add a `workflow_name.nf` file to `workflows/` with the following template:

```
include { subworkflow } from "${params.subworkflows.subworkflow_name}"

Expand All @@ -46,19 +55,22 @@ workflow run_main {
}
```

2. Create the associated `workflow_name.config` file in `conf/` with the following
template:
2. Create the associated `workflow_name.config` file in `conf/` with the following
template:

```
params {
inputs = //path or list of paths to inputs
}
```

## Adding new subworkflows
Subworkflows are intended to be reusable pieces across different workflows. To add a new

Subworkflows are intended to be reusable pieces across different workflows. To add a new
subworkflow, follow these steps:

1. Add a `main.nf` file to `subworkflows/subworkflow_name/` with the following template:

```
include { module } from "${params.modules.module_name}"

Expand All @@ -73,8 +85,10 @@ workflow subworkflow_name {
module.out
}
```

2. Add a reference to the new subworkflow under `nextflow.config` to be able to import
it as `params.subworkflows.subworkflow_name
it as `params.subworkflows.subworkflow_name

```
params {
subworkflow_name = "${params.subworkflows.root}/subworkflow_name/main.nf"
Expand All @@ -88,6 +102,7 @@ They can be found under `modules/` and `bin/`, respecively. To add a new module,
these steps:

1. Add a `module_name.nf` file to `modules/` with the following template:

```
process module_name {
input:
Expand All @@ -107,6 +122,7 @@ process module_name {
```

2. Add the corresponding Python script to `bin/` with the following template:

```
from utils import wrap_kwargs

Expand All @@ -119,7 +135,8 @@ if __name__ == "__main__":
```

3. Add references to the new module and script under `nextflow.config` to be able to
import them as `params.modules.module_name` and `params.bin.module_name`, respectively.
import them as `params.modules.module_name` and `params.bin.module_name`, respectively.

```
params {
modules {
Expand All @@ -132,6 +149,7 @@ params {
```

4. Specify the conda environment for the new module in `nextflow.config`:

```
params {
env {
Expand Down
79 changes: 72 additions & 7 deletions bin/composition_baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,69 @@


class CompositionBaseline:
"""
Baseline method for computing sample-sample distances within clusters.

Parameters
----------
adata
Annotated data matrix.
batch_key
Key in `adata.obs` corresponding to batch/site information.
sample_key
Key in `adata.obs` corresponding to sample information.
rep
Representation to compute distances on. One of the following:

* `"PCA"`: :func:`~scanpy.pp.pca`
* `"SCVI"`: :class:`~scvi.model.SCVI`
model_kwargs
Keyword arguments to pass into :class:`~scvi.model.SCVI`. Also contains the
following keys:

* `"n_dim"`: Number of components to use for PCA
* `"clustering_on"`: One of `"leiden"` or `"cluster_key"`. If `"leiden"`,
clusters are computed using :func:`~scanpy.tl.leiden`. If `"cluster_key"`,
clusters correspond to values in `adata.obs[cluster_key]`.
* `"cluster_key"`: Key in `adata.obs` corresponding to cluster information.
train_kwargs
Keyword arguments to pass into :func:`~scvi.model.SCVI.train`.
"""

def __init__(
self,
adata: AnnData,
batch_key: Optional[str],
sample_key: Optional[str],
rep: Union[Literal["PCA"], Literal["SCVI"]],
model_kwargs=None,
train_kwargs=None,
model_kwargs: dict = None,
train_kwargs: dict = None,
):
assert rep in ["PCA", "SCVI"]
self.adata = adata.copy()
self.batch_key = batch_key
self.sample_key = sample_key
self.model_kwargs = model_kwargs if model_kwargs is not None else dict()
self.train_kwargs = train_kwargs if train_kwargs is not None else dict()
self.model_kwargs = model_kwargs if model_kwargs is not None else {}
self.train_kwargs = train_kwargs if train_kwargs is not None else {}

self.rep = rep
self.rep_key = "X_rep"
self.n_dim = model_kwargs.pop("n_dim", 50)
self.clustering_on = model_kwargs.pop(
self.n_dim = self.model_kwargs.pop("n_dim", 50)
self.clustering_on = self.model_kwargs.pop(
"clustering_on", "leiden"
) # one of leiden, cluster_key
self.cluster_key = model_kwargs.pop("cluster_key", None)
self.subcluster_key = "leiden_subcluster"

def preprocess_data(self):
"""Preprocess data for PCA or SCVI."""
if self.rep == "PCA":
sc.pp.normalize_total(self.adata, target_sum=1e4)
sc.pp.log1p(self.adata)
# TODO: snakemake workflow had hvg selection, should we add it here too?

def fit(self):
"""Fit PCA or SCVI."""
self.preprocess_data()
if self.rep == "PCA":
sc.pp.pca(self.adata, n_comps=self.n_dim) # saves "X_pca" in obsm
Expand All @@ -50,10 +82,23 @@ def fit(self):
self.adata.obsm[self.rep_key] = scvi_model.get_latent_representation()

def get_cell_representation(self):
"""Get cell representations."""
return self.adata.obsm[self.rep_key]

def get_local_sample_representation(self):
"""
Computes local sample representations within each cluster.

Represents samples as proportion of total sample cells within Leiden subclusters.

Returns
-------
Dictionary
Keys are unique labels in `adata.obs[self.cluster_key]`.
Values are :class:`pandas.DataFrame` with rows corresponding to samples
within that cluster and columns corresponding to the frequency of each
sample within each subcluster.
"""
if self.clustering_on == "leiden":
sc.pp.neighbors(self.adata, n_neighbors=30, use_rep=self.rep_key)
sc.tl.leiden(self.adata, resolution=1.0, key_added="leiden_1.0")
Expand All @@ -66,7 +111,7 @@ def get_local_sample_representation(self):
"If clustering_on is cluster_key, cluster_key must be provided."
)

freqs_all = dict()
freqs_all = {}
for unique_cluster in self.adata.obs[self.cluster_key].unique():
cell_is_selected = self.adata.obs[self.cluster_key] == unique_cluster
subann = self.adata[cell_is_selected].copy()
Expand All @@ -75,20 +120,25 @@ def get_local_sample_representation(self):
sc.pp.neighbors(subann, n_neighbors=30, use_rep=self.rep_key)
sc.tl.leiden(subann, resolution=1.0, key_added=self.subcluster_key)

# Step 2: compute number of cells per subcluster-sample pair
szs = (
subann.obs.groupby([self.subcluster_key, self.sample_key])
.size()
.to_frame("n_cells")
.reset_index()
)
# Step 3: compute total number of cells per sample
szs_total = (
szs.groupby(self.subcluster_key)
.sum()
.rename(columns={"n_cells": "n_cells_total"})
)
# Step 4: compute frequency of each subcluster per sample
comps = szs.merge(szs_total, on=self.subcluster_key).assign(
freqs=lambda x: x.n_cells / x.n_cells_total
)
# Step 5: compute representation of each sample as the vector
# of frequencies in each subcluster
freqs = (
comps.loc[:, [self.sample_key, self.subcluster_key, "freqs"]]
.set_index([self.sample_key, self.subcluster_key])
Expand All @@ -101,14 +151,29 @@ def get_local_sample_representation(self):
return freqs_all

def get_local_sample_distances(self):
"""
Computes local sample distances for each cell.

Sample-sample distances for a particular cell are computed as the Euclidean
distance between the sample representations within the cell's cluster.
Distances for samples absent in a cluster are imputed as `1.0`.

Returns
-------
:class:`numpy.ndarray`
Array of shape `(n_cells, n_samples, n_samples)`. Cells in the same cluster
have the same distance matrix.
"""
freqs_all = self.get_local_sample_representation()
n_sample = self.adata.obs[self.sample_key].nunique()
sample_order = self.adata.obs[self.sample_key].unique()
local_dists = np.zeros((self.adata.n_obs, n_sample, n_sample))

for cluster, freqs in freqs_all.items():
cell_is_selected = self.adata.obs[self.cluster_key] == cluster
ordered_freqs = freqs.reindex(sample_order).fillna(1.0)
cluster_reps = ordered_freqs.values
cluster_dists = pairwise_distances(cluster_reps, metric="euclidean")
local_dists[cell_is_selected] = cluster_dists

return local_dists
61 changes: 40 additions & 21 deletions bin/compute_2dreps.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,50 @@
import scanpy as sc
import pymde
from utils import make_parents, wrap_kwargs

import scanpy as sc
from anndata import AnnData
from utils import wrap_kwargs, write_h5ad

mde_kwargs = dict(
embedding_dim=2,
constraint=pymde.Standardized(),
repulsive_fraction=0.7,
device="cuda",
n_neighbors=15,
)
mde_kwargs = {
"embedding_dim": 2,
"constraint": pymde.Standardized(),
"repulsive_fraction": 0.7,
"device": "cuda",
"n_neighbors": 15,
}


@wrap_kwargs
def compute_2dreps(
*,
adata_in,
adata_out,
):
"""Computes low dimensional representations for existing latent representations.
adata_in: str,
adata_out: str,
) -> AnnData:
"""
Computes low dimensional representations for existing latent representations.

Iterates over keys in `.uns["latent_keys"]` and computes the following:

* :func:`~pymde.pca` with `embedding_dim=2`
* :func:`~tsnecuda.TSNE.fit_transform` with `perplexity=30`
* :func:`~pymde.preserve_neighbors` with `mde_kwargs`

Parameters
----------
adata_in :
Path to anndata containing representations
adata_out :
Path to save the file
adata_in
Path to :class:`anndata.AnnData` containing representations.
adata_out
Path to save the output :class:`anndata.AnnData` with 2D representations.

Returns
-------
:class:`anndata.AnnData`
Annotated data matrix with the following:

* `.uns["latent_pca_keys"]`: Keys in `.obsm` corresponding to PCA representations.
* `.uns["latent_tsne_keys"]`: Keys in `.obsm` corresponding to t-SNE representations.
* `.uns["latent_mde_keys"]`: Keys in `.obsm` corresponding to MDE representations.
"""
adata = sc.read_h5ad(adata_in)
latent_keys = adata.uns.get("latent_keys", None)
make_parents(adata_out)

if latent_keys is None:
adata.write_h5ad(adata_out)
Expand All @@ -44,14 +59,18 @@ def compute_2dreps(
adata.obsm[latent_pca_key] = latent_pca
latent_pca_keys.append(latent_pca_key)

latent_mde = pymde.preserve_neighbors(latent, **mde_kwargs).embed().cpu().numpy()
latent_mde = (
pymde.preserve_neighbors(latent, **mde_kwargs).embed().cpu().numpy()
)
latent_mde_key = f"{latent_key}_mde"
adata.obsm[latent_mde_key] = latent_mde
latent_mde_keys.append(latent_mde_key)

adata.uns["latent_pca_keys"] = latent_pca_keys
adata.uns["latent_mde_keys"] = latent_mde_keys

adata.write_h5ad(adata_out)
return write_h5ad(adata, adata_out)


if __name__ == "__main__":
compute_2dreps()
Loading