Skip to content

Commit

Permalink
Add census_data and census_spatial collections
Browse files Browse the repository at this point in the history
  • Loading branch information
prathapsridharan committed Jun 7, 2024
1 parent 28ac990 commit 049f00d
Show file tree
Hide file tree
Showing 6 changed files with 984 additions and 304 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .globals import (
CENSUS_DATA_NAME,
CENSUS_INFO_NAME,
CENSUS_SPATIAL_NAME,
SOMA_TileDB_Context,
)
from .manifest import load_manifest
Expand Down Expand Up @@ -153,7 +154,7 @@ def populate_root_collection(root_collection: soma.Collection) -> soma.Collectio
root_collection.metadata["git_commit_sha"] = sha

# Create sub-collections for experiments, etc.
for n in [CENSUS_INFO_NAME, CENSUS_DATA_NAME]:
for n in [CENSUS_INFO_NAME, CENSUS_DATA_NAME, CENSUS_SPATIAL_NAME]:
root_collection.add_new_collection(n)

return root_collection
Expand Down Expand Up @@ -198,7 +199,14 @@ def build_step2_create_root_collection(soma_path: str, experiment_builders: list
populate_root_collection(root_collection)

for e in experiment_builders:
e.create(census_data=root_collection[CENSUS_DATA_NAME])
# TODO (spatial): Confirm the decision that we are clearly separating
# experiments containing spatial assays from experiments not containing
# spatial assays. That is, an experiment should never contain assays from
# spatial and non-spatial modalities
if e.specification.is_exclusively_spatial():
e.create(census_data=root_collection[CENSUS_SPATIAL_NAME])
else:
e.create(census_data=root_collection[CENSUS_DATA_NAME])

logger.info("Build step 2 - Create root collection - finished")
return root_collection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .anndata import AnnDataFilterSpec, AnnDataProxy, open_anndata
from .datasets import Dataset
from .globals import (
ALLOWED_SPATIAL_ASSAYS,
CENSUS_OBS_PLATFORM_CONFIG,
CENSUS_OBS_TABLE_SPEC,
CENSUS_VAR_PLATFORM_CONFIG,
Expand Down Expand Up @@ -109,6 +110,10 @@ def create(
"""Factory method. Do not instantiate the class directly."""
return cls(name, label, anndata_cell_filter_spec, organism_ontology_term_id)

def is_exclusively_spatial(self) -> bool:
"""Returns True if the experiment specification EXCLUSIVELY involves spatial assays."""
return self.anndata_cell_filter_spec["assay_ontology_term_ids"] == ALLOWED_SPATIAL_ASSAYS


class ExperimentBuilder:
"""Class that embodies the operators and state to build an Experiment.
Expand Down Expand Up @@ -143,7 +148,7 @@ def anndata_cell_filter_spec(self) -> AnnDataFilterSpec:
return self.specification.anndata_cell_filter_spec

def create(self, census_data: soma.Collection) -> None:
"""Create experiment within the specified Collection with a single Measurement."""
"""Create experiment within the specified Collection."""
logger.info(f"{self.name}: create experiment at {urlcat(census_data.uri, self.name)}")

self.experiment = census_data.add_new_collection(self.name, soma.Experiment)
Expand All @@ -155,6 +160,10 @@ def create(self, census_data: soma.Collection) -> None:
# make measurement and add to ms collection
ms.add_new_collection(MEASUREMENT_RNA_NAME, soma.Measurement)

# create `spatial`
if self.specification.is_exclusively_spatial():
self.experiment.add_new_collection("spatial")

def write_obs_dataframe(self) -> None:
logger.info(f"{self.name}: writing obs dataframe")
assert self.experiment is not None
Expand Down Expand Up @@ -661,7 +670,7 @@ def read_and_dispatch_partial_h5ad(
if d.dataset_id in eb.dataset_obs_joinid_start
for chunk in range(0, eb.dataset_n_obs[d.dataset_id], REDUCE_X_MAJOR_ROW_STRIDE)
]
per_eb_results[eb.name] = (
per_eb_results[eb.experiment_uri] = (
dask.bag.from_sequence(read_file_chunks)
.starmap(read_and_dispatch_partial_h5ad, global_var_joinids=global_var_joinids)
.foldby("dataset_id", reduce_X_stats_binop)
Expand All @@ -685,12 +694,12 @@ def populate_X_layers(
per_eb_results = _reduce_X_matrices(assets_path, datasets, experiment_builders)

for eb in experiment_builders:
if eb.name not in per_eb_results:
if eb.experiment_uri not in per_eb_results:
continue

# add per-dataset stats to each per-dataset XReduction
eb_result: list[XReduction] = []
for dataset_id, xreduction in per_eb_results[eb.name]:
for dataset_id, xreduction in per_eb_results[eb.experiment_uri]:
assert dataset_id == xreduction["dataset_id"]
d = datasets_by_id[dataset_id]
eb_result.extend(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import functools

from .experiment_builder import ExperimentBuilder, ExperimentSpecification
from .globals import RNA_SEQ
from .globals import ALLOWED_SPATIAL_ASSAYS, RNA_SEQ


@functools.cache
Expand Down Expand Up @@ -30,6 +30,25 @@ def make_experiment_specs() -> list[ExperimentSpecification]:
},
organism_ontology_term_id="NCBITaxon:10090",
),
# Experiments for spatial assays
ExperimentSpecification.create(
name="homo_sapiens",
label="Homo sapiens",
anndata_cell_filter_spec={
"organism_ontology_term_id": "NCBITaxon:9606",
"assay_ontology_term_ids": ALLOWED_SPATIAL_ASSAYS,
},
organism_ontology_term_id="NCBITaxon:9606",
),
ExperimentSpecification.create(
name="mus_musculus",
label="Mus musculus",
anndata_cell_filter_spec={
"organism_ontology_term_id": "NCBITaxon:10090",
"assay_ontology_term_ids": ALLOWED_SPATIAL_ASSAYS,
},
organism_ontology_term_id="NCBITaxon:10090",
),
]


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@
# top-level SOMA collection
CENSUS_DATA_NAME = "census_data"

# top-level SOMA collection
CENSUS_SPATIAL_NAME = "census_spatial"

# "census_info"/"summary_cell_counts" SOMA Dataframe
CENSUS_SUMMARY_CELL_COUNTS_NAME = "summary_cell_counts" # object name

Expand Down Expand Up @@ -329,7 +332,6 @@
"EFO:0010713", # 10x immune profiling
"EFO:0010714", # 10x TCR enrichment
"EFO:0010715", # 10x Ig enrichment
"EFO:0010961", # Visium Spatial Gene Expression
"EFO:0010964", # barcoded plate-based single cell RNA-seq
"EFO:0011025", # 10x 5' v1
"EFO:0022396", # TruSeq
Expand All @@ -353,6 +355,12 @@
"EFO:0700016", # Smart-seq v4
]

# list of EFO terms that correspond to SPATIAL modality/measurement. These terms
# define the inclusive filter applied to obs.assay_ontology_term_id. All other
ALLOWED_SPATIAL_ASSAYS = [
"EFO:0010961", # Visium Spatial Gene Expression
]

# Full-gene assays have special handling in the "normalized" X layers
FULL_GENE_ASSAY = [
"EFO:0003755", # FL-cDNA
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
CENSUS_OBS_STATS_COLUMNS,
CENSUS_OBS_TABLE_SPEC,
CENSUS_SCHEMA_VERSION,
CENSUS_SPATIAL_NAME,
CENSUS_SUMMARY_CELL_COUNTS_NAME,
CENSUS_SUMMARY_CELL_COUNTS_TABLE_SPEC,
CENSUS_SUMMARY_NAME,
Expand Down Expand Up @@ -82,9 +83,18 @@ def assert_all(__iterable: Iterable[object]) -> bool:
return r


def get_census_data_collection_name(eb: ExperimentSpecification) -> str:
return CENSUS_SPATIAL_NAME if eb.is_exclusively_spatial() else CENSUS_DATA_NAME


def get_experiment_uri(base_uri: str, eb: ExperimentSpecification) -> str:
census_data_collection_name = get_census_data_collection_name(eb)
return urlcat(base_uri, census_data_collection_name, eb.name)


def open_experiment(base_uri: str, eb: ExperimentSpecification) -> soma.Experiment:
"""Helper function that knows the Census schema path conventions."""
return soma.Experiment.open(urlcat(base_uri, CENSUS_DATA_NAME, eb.name), mode="r")
return soma.Experiment.open(get_experiment_uri(base_uri, eb), mode="r")


def get_experiment_shape(base_uri: str, specs: list[ExperimentSpecification]) -> dict[str, tuple[int, int]]:
Expand Down Expand Up @@ -230,9 +240,10 @@ def validate_axis_dataframes_global_ids(
.concat()
.to_pandas()
)
assert eb_info[eb.name].n_obs == len(census_obs_df) == exp.obs.count
assert (len(census_obs_df) == 0) or (census_obs_df.soma_joinid.max() + 1 == eb_info[eb.name].n_obs)
assert eb_info[eb.name].dataset_ids == set(census_obs_df.dataset_id.unique())
eb_info_key = get_experiment_uri(soma_path, eb)
assert eb_info[eb_info_key].n_obs == len(census_obs_df) == exp.obs.count
assert (len(census_obs_df) == 0) or (census_obs_df.soma_joinid.max() + 1 == eb_info[eb_info_key].n_obs)
assert eb_info[eb_info_key].dataset_ids == set(census_obs_df.dataset_id.unique())

# Validate that all obs soma_joinids are unique and in the range [0, n).
obs_unique_joinids = np.unique(census_obs_df.soma_joinid.to_numpy())
Expand All @@ -254,13 +265,13 @@ def validate_axis_dataframes_global_ids(
del census_obs_df, obs_unique_joinids

# var
n_vars = len(eb_info[eb.name].vars)
n_vars = len(eb_info[eb_info_key].vars)

census_var_df = (
exp.ms[MEASUREMENT_RNA_NAME].var.read(column_names=["feature_id", "soma_joinid"]).concat().to_pandas()
)
assert n_vars == len(census_var_df) == exp.ms[MEASUREMENT_RNA_NAME].var.count
assert eb_info[eb.name].vars == set(census_var_df.feature_id.array)
assert eb_info[eb_info_key].vars == set(census_var_df.feature_id.array)
assert (len(census_var_df) == 0) or (census_var_df.soma_joinid.max() + 1 == n_vars)

# Validate that all var soma_joinids are unique and in the range [0, n).
Expand Down Expand Up @@ -289,7 +300,7 @@ def _validate_axis_dataframes(
eb_info: dict[str, EbInfo] = {}
for eb in experiment_specifications:
with soma.Collection.open(soma_path, context=SOMA_TileDB_Context()) as census:
census_data = census[CENSUS_DATA_NAME]
census_data_collection = census[get_census_data_collection_name(eb)]
dataset_id = dataset.dataset_id
ad = open_anndata(
dataset,
Expand All @@ -298,8 +309,16 @@ def _validate_axis_dataframes(
var_column_names=CXG_VAR_COLUMNS_READ,
filter_spec=eb.anndata_cell_filter_spec,
)
eb_info[eb.name] = EbInfo()
se = census_data[eb.name]
se = census_data_collection[eb.name]

# NOTE: Since we are validating data for each experiment, we
# use the experiment uri as the key for the data that must be validated.
# Using just the experiment spec name would cause collisions as in the case
# of spatial and non-spatial experiments with the same name (experiment spec name)
# but stored under different census root collections
eb_info_key = get_experiment_uri(soma_path, eb)
eb_info[eb_info_key] = EbInfo()

dataset_obs = (
se.obs.read(
column_names=list(CENSUS_OBS_TABLE_SPEC.field_names()),
Expand All @@ -326,11 +345,13 @@ def _validate_axis_dataframes(
if isinstance(dataset_obs[key].dtype, pd.CategoricalDtype):
dataset_obs[key] = dataset_obs[key].astype(dataset_obs[key].cat.categories.dtype)

assert len(dataset_obs) == len(ad.obs), f"{dataset.dataset_id}/{eb.name} obs length mismatch"
assert (
len(dataset_obs) == len(ad.obs)
), f"{dataset.dataset_id}/{eb.name} obs length mismatch soma experiment obs len: {len(dataset_obs)} != anndata obs len: {len(ad.obs)}"
if ad.n_obs > 0:
eb_info[eb.name].n_obs += ad.n_obs
eb_info[eb.name].dataset_ids.add(dataset_id)
eb_info[eb.name].vars |= set(ad.var.index.array)
eb_info[eb_info_key].n_obs += ad.n_obs
eb_info[eb_info_key].dataset_ids.add(dataset_id)
eb_info[eb_info_key].vars |= set(ad.var.index.array)
ad_obs = ad.obs[list(set(CXG_OBS_TERM_COLUMNS) - set(CENSUS_OBS_STATS_COLUMNS))].reset_index(
drop=True
)
Expand All @@ -343,11 +364,11 @@ def _validate_axis_dataframes(
def reduce_eb_info(results: Sequence[dict[str, EbInfo]]) -> dict[str, EbInfo]:
eb_info = {}
for res in results:
for name, info in res.items():
if name not in eb_info:
eb_info[name] = copy.copy(info)
for eb_info_key, info in res.items():
if eb_info_key not in eb_info:
eb_info[eb_info_key] = copy.copy(info)
else:
eb_info[name].update(info)
eb_info[eb_info_key].update(info)
return eb_info

eb_info = (
Expand Down Expand Up @@ -815,8 +836,9 @@ def validate_X_layers_schema(
with open_experiment(soma_path, eb) as exp:
assert soma.Collection.exists(exp.ms[MEASUREMENT_RNA_NAME].X.uri)

n_obs = eb_info[eb.name].n_obs
n_vars = eb_info[eb.name].n_vars
eb_info_key = get_experiment_uri(soma_path, eb)
n_obs = eb_info[eb_info_key].n_obs
n_vars = eb_info[eb_info_key].n_vars
assert n_obs == exp.obs.count
assert n_vars == exp.ms[MEASUREMENT_RNA_NAME].var.count

Expand Down Expand Up @@ -1011,8 +1033,9 @@ def get_sparse_arrays(C: soma.Collection) -> list[soma.SparseNDArray]:
# first, confirm we set shape correctly, as the code uses it as the max bounding box
for eb in experiment_specifications:
with open_experiment(soma_path, eb) as exp:
n_obs = eb_info[eb.name].n_obs
n_vars = eb_info[eb.name].n_vars
eb_info_key = get_experiment_uri(soma_path, eb)
n_obs = eb_info[eb_info_key].n_obs
n_vars = eb_info[eb_info_key].n_vars
for layer_name in exp.ms[MEASUREMENT_RNA_NAME].X:
assert exp.ms[MEASUREMENT_RNA_NAME].X[layer_name].shape == (n_obs, n_vars)
if "feature_dataset_presence_matrix" in exp.ms[MEASUREMENT_RNA_NAME]:
Expand Down

0 comments on commit 049f00d

Please sign in to comment.