Skip to content

Commit

Permalink
test run_local_variability.py
Browse files Browse the repository at this point in the history
  • Loading branch information
katosh committed Oct 3, 2023
1 parent 776652c commit db176f7
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 29 deletions.
8 changes: 5 additions & 3 deletions src/palantir/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ def run_pca(
if not use_hvg:
n_comps = n_components
else:
l_n_comps = min(1000, ad.n_obs-1, ad.n_vars-1)
l_n_comps = min(1000, ad.n_obs - 1, ad.n_vars - 1)
sc.pp.pca(ad, n_comps=l_n_comps, use_highly_variable=True, zero_center=False)
try:
n_comps = np.where(np.cumsum(ad.uns["pca"]["variance_ratio"]) > 0.85)[0][0]
except IndexError:
n_comps = n_components

# Rerun with selection number of components
n_comps = min(n_comps, ad.n_obs-1, ad.n_vars-1)
n_comps = min(n_comps, ad.n_obs - 1, ad.n_vars - 1)
sc.pp.pca(ad, n_comps=n_comps, use_highly_variable=use_hvg, zero_center=False)

if isinstance(data, sc.AnnData):
Expand Down Expand Up @@ -492,13 +492,15 @@ def _local_var_helper(expressions, distances):
def cast(x):
return x.todense()

issparse = True
else:

def cast(x):
return x

issparse = False
for cell in range(expressions.shape[0]):
neighbors = distances.getrow(cell).indices
neighbors = distances.getrow(cell).indices if issparse else slice(None)
try:
neighbor_expression = cast(expressions[neighbors, :])
cell_expression = cast(expressions[cell, :])
Expand Down
19 changes: 13 additions & 6 deletions tests/utils_compute_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from palantir.utils import compute_kernel


@pytest.fixture
def mock_data():
n_cells = 50
Expand All @@ -16,40 +17,46 @@ def mock_data():
index=[f"cell_{i}" for i in range(n_cells)],
)


@pytest.fixture
def mock_anndata(mock_data):
ad = sc.AnnData(X=mock_data)
ad.obsm["X_pca"] = mock_data
return ad


# Test with DataFrame
def test_compute_kernel_dataframe(mock_data):
kernel = compute_kernel(mock_data)
assert isinstance(kernel, csr_matrix)


# Test with AnnData
def test_compute_kernel_anndata(mock_anndata):
kernel = compute_kernel(mock_anndata)
assert 'DM_Kernel' in mock_anndata.obsp.keys()
assert "DM_Kernel" in mock_anndata.obsp.keys()


# Test knn parameter
def test_compute_kernel_knn(mock_data):
kernel = compute_kernel(mock_data, knn=10)
assert isinstance(kernel, csr_matrix)


# Test alpha parameter
def test_compute_kernel_alpha(mock_data):
kernel = compute_kernel(mock_data, alpha=0.5)
assert isinstance(kernel, csr_matrix)


# Test pca_key parameter
def test_compute_kernel_pca_key(mock_anndata):
mock_anndata.obsm["custom_pca"] = np.random.rand(mock_anndata.shape[0], 10)
kernel = compute_kernel(mock_anndata, pca_key='custom_pca')
assert 'DM_Kernel' in mock_anndata.obsp.keys()
kernel = compute_kernel(mock_anndata, pca_key="custom_pca")
assert "DM_Kernel" in mock_anndata.obsp.keys()


# Test kernel_key parameter
def test_compute_kernel_kernel_key(mock_anndata):
kernel = compute_kernel(mock_anndata, kernel_key='custom_kernel')
assert 'custom_kernel' in mock_anndata.obsp.keys()

kernel = compute_kernel(mock_anndata, kernel_key="custom_kernel")
assert "custom_kernel" in mock_anndata.obsp.keys()
10 changes: 8 additions & 2 deletions tests/utils_diffusion_maps_from_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

from palantir.utils import diffusion_maps_from_kernel


def create_mock_kernel(size):
# Creating a mock symmetric positive definite kernel matrix
A = np.random.rand(size, size)
return csr_matrix((A + A.T) / 2)


def test_diffusion_maps_basic():
kernel = create_mock_kernel(50)
result = diffusion_maps_from_kernel(kernel)
Expand All @@ -23,13 +25,15 @@ def test_diffusion_maps_basic():
assert result["EigenVectors"].shape == (50, 10)
assert result["EigenValues"].shape == (10,)


def test_diffusion_maps_n_components():
kernel = create_mock_kernel(50)
result = diffusion_maps_from_kernel(kernel, n_components=5)

assert result["EigenVectors"].shape == (50, 5)
assert result["EigenValues"].shape == (5,)


def test_diffusion_maps_seed():
kernel = create_mock_kernel(50)
result1 = diffusion_maps_from_kernel(kernel, seed=0)
Expand All @@ -38,12 +42,14 @@ def test_diffusion_maps_seed():
# Seed usage should yield the same result
assert np.allclose(result1["EigenValues"], result2["EigenValues"])


def test_diffusion_maps_eigen():
kernel = create_mock_kernel(50)
result = diffusion_maps_from_kernel(kernel)

T = result["T"].toarray()
e_values, e_vectors = eigs(T, 10, tol=1e-4, maxiter=1000)

assert np.allclose(result["EigenValues"], np.real(sorted(e_values, reverse=True)[:10]), atol=1e-4)

assert np.allclose(
result["EigenValues"], np.real(sorted(e_values, reverse=True)[:10]), atol=1e-4
)
40 changes: 22 additions & 18 deletions tests/utils_run_diffusion_maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,43 @@
def mock_dataframe(rows, cols):
return pd.DataFrame(np.random.rand(rows, cols))


# Generate mock sc.AnnData object
def mock_anndata(rows, cols, keys):
ad = sc.AnnData(np.random.rand(rows, cols))
for key in keys:
ad.obsm[key] = np.random.rand(rows, cols)
return ad


def test_run_diffusion_maps_dataframe():
df = mock_dataframe(50, 30)
result = run_diffusion_maps(df)

assert isinstance(result, dict)
assert set(result.keys()) == {'T', 'EigenVectors', 'EigenValues', 'kernel'}

assert isinstance(result['kernel'], csr_matrix)
assert isinstance(result['T'], csr_matrix)
assert isinstance(result['EigenVectors'], pd.DataFrame)
assert isinstance(result['EigenValues'], pd.Series)
assert set(result.keys()) == {"T", "EigenVectors", "EigenValues", "kernel"}

assert isinstance(result["kernel"], csr_matrix)
assert isinstance(result["T"], csr_matrix)
assert isinstance(result["EigenVectors"], pd.DataFrame)
assert isinstance(result["EigenValues"], pd.Series)


def test_run_diffusion_maps_anndata():
keys = ['X_pca']
keys = ["X_pca"]
ad = mock_anndata(50, 30, keys)
result = run_diffusion_maps(ad)

assert 'DM_Kernel' in ad.obsp
assert 'DM_Similarity' in ad.obsp
assert 'DM_EigenVectors' in ad.obsm
assert 'DM_EigenValues' in ad.uns

assert np.array_equal(ad.obsp['DM_Kernel'].toarray(), result['kernel'].toarray())
assert np.array_equal(ad.obsp['DM_Similarity'].toarray(), result['T'].toarray())
assert np.array_equal(ad.obsm['DM_EigenVectors'], result['EigenVectors'].values)
assert np.array_equal(ad.uns['DM_EigenValues'], result['EigenValues'])

assert "DM_Kernel" in ad.obsp
assert "DM_Similarity" in ad.obsp
assert "DM_EigenVectors" in ad.obsm
assert "DM_EigenValues" in ad.uns

assert np.array_equal(ad.obsp["DM_Kernel"].toarray(), result["kernel"].toarray())
assert np.array_equal(ad.obsp["DM_Similarity"].toarray(), result["T"].toarray())
assert np.array_equal(ad.obsm["DM_EigenVectors"], result["EigenVectors"].values)
assert np.array_equal(ad.uns["DM_EigenValues"], result["EigenValues"])


def test_run_diffusion_maps_exceptions():
# Test with neither pd.DataFrame nor sc.AnnData
Expand Down
82 changes: 82 additions & 0 deletions tests/utils_run_local_variability.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import scanpy as sc
import numpy as np
import pytest
from scipy.sparse import csr_matrix

from palantir.utils import run_local_variability

# Mock data for dense matrix
def mock_anndata_dense(n_cells, n_genes, layer_keys, obsp_keys):
ad = sc.AnnData(np.random.rand(n_cells, n_genes))
for key in layer_keys:
ad.layers[key] = np.random.rand(n_cells, n_genes)
for key in obsp_keys:
ad.obsp[key] = np.random.rand(n_cells, n_cells)
return ad


# Mock data for sparse matrix
def mock_anndata_sparse(n_cells, n_genes, layer_keys, obsp_keys):
ad = sc.AnnData(csr_matrix(np.random.rand(n_cells, n_genes)))
for key in layer_keys:
ad.layers[key] = csr_matrix(np.random.rand(n_cells, n_genes))
for key in obsp_keys:
ad.obsp[key] = csr_matrix(np.random.rand(n_cells, n_cells))
return ad


# Test with default keys, dense
@pytest.mark.filterwarnings("ignore:invalid value encountered in divide")
def test_run_local_variability_default_dense():
ad = mock_anndata_dense(50, 20, ["MAGIC_imputed_data"], ["distances"])
_test_run_local_variability(ad)


# Test with default keys, sparse
@pytest.mark.filterwarnings("ignore:invalid value encountered in divide")
def test_run_local_variability_default_sparse():
ad = mock_anndata_sparse(50, 20, ["MAGIC_imputed_data"], ["distances"])
_test_run_local_variability(ad)


# Test with custom keys, dense
@pytest.mark.filterwarnings("ignore:invalid value encountered in divide")
def test_run_local_variability_custom_keys_dense():
ad = mock_anndata_dense(50, 20, ["custom_expression"], ["custom_distances"])
_test_run_local_variability(
ad, "custom_expression", "custom_distances", "custom_local_var"
)


# Test with custom keys, sparse
@pytest.mark.filterwarnings("ignore:invalid value encountered in divide")
def test_run_local_variability_custom_keys_sparse():
ad = mock_anndata_sparse(50, 20, ["custom_expression"], ["custom_distances"])
_test_run_local_variability(
ad, "custom_expression", "custom_distances", "custom_local_var"
)


# Helper function for assertions
def _test_run_local_variability(
ad,
expression_key="MAGIC_imputed_data",
distances_key="distances",
localvar_key="local_variability",
):
result = run_local_variability(ad, expression_key, distances_key, localvar_key)

assert localvar_key in ad.layers
assert isinstance(result, np.ndarray) or isinstance(result, csr_matrix)
assert result.shape == (50, 20)


# Test missing keys
def test_run_local_variability_missing_keys():
ad = mock_anndata_dense(50, 20, ["MAGIC_imputed_data"], ["distances"])

with pytest.raises(KeyError):
run_local_variability(ad, "missing_expression", "distances")

with pytest.raises(KeyError):
run_local_variability(ad, "MAGIC_imputed_data", "missing_distances")

0 comments on commit db176f7

Please sign in to comment.