diff --git a/pyproject.toml b/pyproject.toml index 4a41f6f..5b91df9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,12 +30,17 @@ classifiers = [ "Typing :: Typed", ] dynamic = ["version"] -dependencies = [] +dependencies = [ + "numpy >= 1.26.0", + "dcmqi >= 0.2.0", + "SimpleITK >= 2.3.0", +] [project.optional-dependencies] test = [ "pytest >=6", "pytest-cov >=3", + "idc-index >=0.5.2" ] dev = [ "pytest >=6", @@ -48,6 +53,9 @@ docs = [ "sphinx_autodoc_typehints", "furo>=2023.08.17", ] +segdb = [ + "segdb >= 0.0.3", +] [project.urls] Homepage = "https://github.com/ImagingDataCommons/pydcmqi" diff --git a/src/pydcmqi/segimage.py b/src/pydcmqi/segimage.py new file mode 100644 index 0000000..b5c5b24 --- /dev/null +++ b/src/pydcmqi/segimage.py @@ -0,0 +1,794 @@ +from __future__ import annotations + +import json +import os +import shutil +import subprocess +import tempfile +from collections import OrderedDict +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple, Union + +import numpy as np +import SimpleITK as sitk + +from .types import SegImageDict, SegmentDict, TripletDict + +# --== helper and utility functions ==-- + + +def get_min_max_values(image: sitk.Image) -> Tuple[float, float]: + filter = sitk.MinimumMaximumImageFilter() + filter.Execute(image) + return filter.GetMinimum(), filter.GetMaximum() + + +def _path(path: Union[str, Path]) -> Path: + if isinstance(path, str): + return Path(path) + elif isinstance(path, Path): + return path + else: + raise ValueError("Invalid path type.") + + +# --== class definitions ==-- + + +class Triplet: + """ + A triplet is a data structure that consists of three elements: + - `code meaning`, a human readable label + - `code value`, a unique identifier + - `coding scheme designator`, the issuer of the code value + """ + + @staticmethod + def fromDict(d: TripletDict) -> Triplet: + """ + Create a LabeledTriplet from a dictionary. + + ``` + { + "CodeMeaning": "", + "CodeValue": "", + "CodingSchemeDesignator": "" + } + ``` + """ + return Triplet(d["CodeMeaning"], d["CodeValue"], d["CodingSchemeDesignator"]) + + @staticmethod + def fromTuple(t: Tuple[str, str, str]) -> Triplet: + """ + Create a LabeledTriplet from a tuple. + + ``` + ("", "", "") + ``` + """ + return Triplet(t[0], t[1], t[2]) + + @staticmethod + def empty() -> Triplet: + """ + Create an empty triplet. + """ + return Triplet("", "", "") + + def __init__(self, label: str, code: str, scheme: str) -> None: + self.label = label + self.code = code + self.scheme = scheme + + def __repr__(self) -> str: + return f"[{self.code}:{self.scheme}|{self.label}]" + + def __str__(self) -> str: + return self.label + + def asdict(self) -> TripletDict: + """ + Convert the triplet to a dictionary: + + ``` + { + "CodeMeaning": "", + "CodeValue": "", + "CodingSchemeDesignator": "" + } + ``` + """ + + return { + "CodeMeaning": self.label, + "CodeValue": self.code, + "CodingSchemeDesignator": self.scheme, + } + + @property + def valid(self) -> bool: + """ + A triplet is valid if all fields are non-empty. + Evaluates to `True` if all fields are non-empty, `False` otherwise. + """ + + return all([self.label != "", self.code != "", self.scheme != ""]) + + +class SegmentData: + """ + A class to store and manipulate the data for a segmentation or region of interest. + """ + + def __init__(self) -> None: + self._data: SegmentDict = {} + + def setConfigData(self, config: dict) -> None: + self._data = config.copy() + + def _bake_data(self) -> SegmentDict: + # start from internal data + d = self._data.copy() + + # triplets + triplet_keys = [ + "SegmentedPropertyCategoryCodeSequence", + "SegmentedPropertyTypeCodeSequence", + "SegmentedPropertyTypeModifierCodeSequence", + "AnatomicRegionSequence", + "AnatomicRegionModifierSequence", + ] + + # optional triplets + triplet_keys_optional = [ + "SegmentedPropertyTypeModifierCodeSequence", + "AnatomicRegionSequence", + "AnatomicRegionModifierSequence", + ] + + for k in triplet_keys: + if k not in d and k in triplet_keys_optional: + continue + t = self._triplet_factory(k) + d[k] = t.asdict() + + # return constructed data + return d + + @staticmethod + def validate(data: dict) -> bool: + # TODO: implement full validation (schema, code values, etc.) + + required_fields = [ + "labelID" in data, + "SegmentLabel" in data, + "SegmentDescription" in data, + "SegmentAlgorithmName" in data, + "SegmentAlgorithmType" in data, + "recommendedDisplayRGBValue" in data, + "SegmentedPropertyCategoryCodeSequence" in data, + Triplet.fromDict(data["SegmentedPropertyCategoryCodeSequence"]).valid, + "SegmentedPropertyTypeCodeSequence" in data, + Triplet.fromDict(data["SegmentedPropertyTypeCodeSequence"]).valid, + ] + + optional_fields = [ + "SegmentedPropertyTypeModifierCodeSequence" not in data + or Triplet.fromDict( + data["SegmentedPropertyTypeModifierCodeSequence"] + ).valid, + "AnatomicRegionSequence" not in data + or Triplet.fromDict(data["AnatomicRegionSequence"]).valid, + "AnatomicRegionModifierSequence" not in data + or Triplet.fromDict(data["AnatomicRegionModifierSequence"]).valid, + ] + + print("required_fields", required_fields) + print("optional_fields", optional_fields) + + return all(required_fields) and all(optional_fields) + + def getConfigData(self, bypass_validation: bool = False) -> dict: + assert bypass_validation or self.validate(self.data) + return self.data.copy() + + @property + def data(self) -> SegmentDict: + d = self._bake_data() + return d + + def __getitem__(self, key: str) -> Any: + return self._data[key] + + def __setitem__(self, key: str, value: Any) -> None: + self._data[key] = value + + def _triplet_factory(self, key: str) -> Triplet: + if not hasattr(self, f"__tpf_{key}"): + if key in self._data: + t = Triplet.fromDict(self._data[key]) + else: + t = Triplet.empty() + setattr(self, f"__tpf_{key}", t) + return getattr(self, f"__tpf_{key}") + + @property + def label(self) -> str: + return self._data["SegmentLabel"] + + @label.setter + def label(self, label: str) -> None: + self._data["SegmentLabel"] = label + + @property + def description(self) -> str: + return self._data["SegmentDescription"] + + @description.setter + def description(self, description: str) -> None: + self._data["SegmentDescription"] = description + + @property + def rgb(self) -> Tuple[int]: + return tuple(self._data["recommendedDisplayRGBValue"]) + + @rgb.setter + def rgb(self, rgb: Tuple[int]) -> None: + self._data["recommendedDisplayRGBValue"] = list(rgb) + + @property + def labelID(self) -> int: + return self._data["labelID"] + + @labelID.setter + def labelID(self, labelID: int) -> None: + self._data["labelID"] = labelID + + @property + def segmentAlgorithmName(self) -> str: + return self._data["SegmentAlgorithmName"] + + @segmentAlgorithmName.setter + def segmentAlgorithmName(self, segmentAlgorithmName: str) -> None: + self._data["SegmentAlgorithmName"] = segmentAlgorithmName + + @property + def segmentAlgorithmType(self) -> str: + return self._data["SegmentAlgorithmType"] + + @segmentAlgorithmType.setter + def segmentAlgorithmType(self, segmentAlgorithmType: str) -> None: + self._data["SegmentAlgorithmType"] = segmentAlgorithmType + + @property + def segmentedPropertyCategory(self) -> Triplet: + return self._triplet_factory("SegmentedPropertyCategoryCodeSequence") + + @segmentedPropertyCategory.setter + def segmentedPropertyCategory( + self, value: Union[Tuple[str, str, str], Triplet] + ) -> None: + if isinstance(value, tuple): + value = Triplet.fromTuple(value) + assert isinstance(value, Triplet) + self._data["SegmentedPropertyCategoryCodeSequence"] = value.asdict() + + @property + def segmentedPropertyType(self) -> Triplet: + return self._triplet_factory("SegmentedPropertyTypeCodeSequence") + + @segmentedPropertyType.setter + def segmentedPropertyType( + self, value: Union[Tuple[str, str, str], Triplet] + ) -> None: + if isinstance(value, tuple): + value = Triplet.fromTuple(value) + assert isinstance(value, Triplet) + self._data["SegmentedPropertyTypeCodeSequence"] = value.asdict() + + @property + def segmentedPropertyTypeModifier(self) -> Triplet: + return self._triplet_factory("SegmentedPropertyTypeModifierCodeSequence") + + @segmentedPropertyTypeModifier.setter + def segmentedPropertyTypeModifier( + self, value: Union[Tuple[str, str, str], Triplet] + ) -> None: + if isinstance(value, tuple): + value = Triplet.fromTuple(value) + assert isinstance(value, Triplet) + self._data["SegmentedPropertyTypeModifierCodeSequence"] = value.asdict() + + @property + def hasSegmentedPropertyTypeModifier(self) -> bool: + return "SegmentedPropertyTypeModifierCodeSequence" in self._data + + @property + def anatomicRegion(self) -> Triplet: + return self._triplet_factory("AnatomicRegionSequence") + + @anatomicRegion.setter + def anatomicRegion(self, value: Union[Tuple[str, str, str], Triplet]) -> None: + if isinstance(value, tuple): + value = Triplet.fromTuple(value) + assert isinstance(value, Triplet) + self._data["AnatomicRegionSequence"] = value.asdict() + + @property + def hasAnatomicRegion(self) -> bool: + return "AnatomicRegionSequence" in self._data + + @property + def anatomicRegionModifier(self) -> Triplet: + return self._triplet_factory("AnatomicRegionModifierSequence") + + @anatomicRegionModifier.setter + def anatomicRegionModifier( + self, value: Union[Tuple[str, str, str], Triplet] + ) -> None: + if isinstance(value, tuple): + value = Triplet.fromTuple(value) + assert isinstance(value, Triplet) + self._data["AnatomicRegionModifierSequence"] = value.asdict() + + @property + def hasAnatomicRegionModifier(self) -> bool: + return "AnatomicRegionModifierSequence" in self._data + + +class Segment: + def __init__(self) -> None: + self.path: Optional[Path] = None + self.data = SegmentData() + + self._cached_itk: Optional[sitk.Image] = None + self._cached_numpy: Optional[np.ndarray] = None + + @property + def config(self) -> dict: + return self.data.getConfigData() + + @config.setter + def config(self, config: dict) -> None: + self.data.setConfigData(config) + + @property + def labelID(self) -> int: + return self.data.labelID + + @labelID.setter + def labelID(self, labelID: int) -> None: + self.data.labelID = labelID + + def setFile( + self, path: Union[str, Path], labelID: int, diable_sanity_check: bool = False + ) -> None: + # make sure path is a Path object + path = _path(path) + + # run sanity checks + if not diable_sanity_check: + if not path.exists(): + raise FileNotFoundError(f"File does not exist: {path}") + + if not path.is_file(): + raise ValueError(f"Path is not a file: {path}") + + # check file has as many labels as expected + try: + image = sitk.ReadImage(str(path)) + except Exception: + raise ValueError(f"Could not read image: {path}") + + if image.GetNumberOfComponentsPerPixel() != 1: + raise ValueError( + f"Image must have only one component per pixel: {path}" + ) + + # get min/max values + min_val, max_val = get_min_max_values(image) + assert min_val == 0.0 + assert max_val >= labelID + + # set path and label id + self.path = path + self.labelID = labelID + + @property + def itk(self) -> sitk.Image: + # read image if not cached + if self._cached_itk is None: + self._cached_itk = sitk.ReadImage(str(self.path)) + + # return image + return self._cached_itk + + @property + def numpy(self) -> np.ndarray: + # read image if not cached + if self._cached_numpy is None: + self._cached_numpy = sitk.GetArrayFromImage(self.itk) + + # convert to numpy + return self._cached_numpy + + def isBinary(self) -> bool: + uv = np.unique(self.numpy) + return len(uv) == 2 and 0 in uv and 1 in uv + + def isMultiLabel(self) -> bool: + uv = np.unique(self.numpy) + return len(uv) > 2 + + def isLabel(self, label: int) -> bool: + return label in np.unique(self.numpy) + + def isLabelSet(self, labels: List[int]) -> bool: + return all([self.isLabel(l) for l in labels]) + + def isLabelRange(self, start: int, end: int) -> bool: + return self.isLabelSet(list(range(start, end + 1))) + + @property + def binary(self) -> np.ndarray: + return self.numpy == self.labelID + + def saveAsBinary(self, path: Union[str, Path]) -> None: + # make sure path is a Path object + path = _path(path) + + # create image + image = sitk.GetImageFromArray(self.binary) + image.CopyInformation(self.itk) + + # write image + sitk.WriteImage(image, str(path)) + + +class SegImageData: + def __init__(self) -> None: + self._data: SegImageDict = {} + + def setConfigData(self, config: SegImageDict) -> None: + # NOTE: _data is a pass-by-reference object if we don't use copy + self._data = config.copy() + self._data["segmentAttributes"] = [] + + def getConfigData(self) -> SegImageDict: + return self.asdict() + + @property + def bodyPartExamined(self) -> str: + return self._data["BodyPartExamined"] + + @bodyPartExamined.setter + def bodyPartExamined(self, bodyPartExamined: str) -> None: + self._data["BodyPartExamined"] = bodyPartExamined + + @property + def clinicalTrialCoordinatingCenterName(self) -> str: + return self._data["ClinicalTrialCoordinatingCenterName"] + + @clinicalTrialCoordinatingCenterName.setter + def clinicalTrialCoordinatingCenterName( + self, clinicalTrialCoordinatingCenterName: str + ) -> None: + self._data["ClinicalTrialCoordinatingCenterName"] = ( + clinicalTrialCoordinatingCenterName + ) + + @property + def clinicalTrialSeriesID(self) -> str: + return self._data["ClinicalTrialSeriesID"] + + @clinicalTrialSeriesID.setter + def clinicalTrialSeriesID(self, clinicalTrialSeriesID: str) -> None: + self._data["ClinicalTrialSeriesID"] = clinicalTrialSeriesID + + @property + def clinicalTrialTimePointID(self) -> str: + return self._data["ClinicalTrialTimePointID"] + + @clinicalTrialTimePointID.setter + def clinicalTrialTimePointID(self, clinicalTrialTimePointID: str) -> None: + self._data["ClinicalTrialTimePointID"] = clinicalTrialTimePointID + + @property + def contentCreatorName(self) -> str: + return self._data["ContentCreatorName"] + + @contentCreatorName.setter + def contentCreatorName(self, contentCreatorName: str) -> None: + # TODO: incorporate dicom string format factory & validation + self._data["ContentCreatorName"] = contentCreatorName + + @property + def instanceNumber(self) -> str: + return self._data["InstanceNumber"] + + @instanceNumber.setter + def instanceNumber(self, instanceNumber: str) -> None: + self._data["InstanceNumber"] = instanceNumber + + @property + def seriesDescription(self) -> str: + return self._data["SeriesDescription"] + + @seriesDescription.setter + def seriesDescription(self, seriesDescription: str) -> None: + self._data["SeriesDescription"] = seriesDescription + + @property + def seriesNumber(self) -> str: + return self._data["SeriesNumber"] + + @seriesNumber.setter + def seriesNumber(self, seriesNumber: str) -> None: + self._data["SeriesNumber"] = seriesNumber + + def asdict(self) -> SegImageDict: + return self._data.copy() + + +class SegImageFiles: + def __init__(self) -> None: + self._dicomseg: Optional[Path] = None + self._config: Optional[Path] = None + + @property + def dicomseg(self) -> Optional[Path]: + return self._dicomseg + + @property + def config(self) -> Optional[Path]: + return self._config + + +class SegImage: + verbose: bool = False + + @staticmethod + def reset() -> None: + SegImage.verbose = False + + def __init__( + self, verbose: Optional[bool] = None, tmp_dir: Optional[Union[Path, str]] = None + ) -> None: + # set verbose + if verbose is not None: + self.verbose = verbose + + # set tmp_dir + if tmp_dir is None: + self.tmp_dir = Path(tempfile.gettempdir()) + elif isinstance(tmp_dir, Path): + self.tmp_dir = tmp_dir + elif isinstance(tmp_dir, str): + self.tmp_dir = Path(tmp_dir) + else: + raise ValueError( + "Invalid tmp_dir, must be either None for default, a Path or a string." + ) + + # set instance state variables + self.data = SegImageData() + self.loaded = False + self.files = SegImageFiles() + + self._config: Optional[dict] = None + self._segments: List[Segment] = [] + + def load( + self, + dicomseg_file: Union[Path, str], + output_dir: Optional[Union[Path, str]] = None, + ) -> bool: + print(f"Converting file: {dicomseg_file} into {output_dir}.") + + # we create a temporary output directory if none is provided in the specified tmp dir + if output_dir is None: + output_dir = Path(self.tmp_dir) / "output" + else: + output_dir = _path(output_dir) + + # create output directory + output_dir.mkdir(parents=True, exist_ok=True) + + # build subprocess command + cmd = [ + "segimage2itkimage", + "-t", + "nifti", + "-p", + "pydcmqi", + "--outputDirectory", + str(output_dir), + "--inputDICOM", + str(dicomseg_file), + ] + + # run subprocess + subprocess.run(cmd, check=True) + + # import data + self._import(output_dir) + + # update file paths + self.files._dicomseg = dicomseg_file + + def _import( + self, output_dir: Path, disable_file_sanity_checks: bool = False + ) -> None: + # iterate all files in the output directory + # - store the config file + # - store the image files + + self._config = None + self._segments = [] + + # read in the config file + config_file = output_dir / "pydcmqi-meta.json" + + # load the config file + with open(config_file) as f: + self._config = json.load(f) + + # load data + # TODO: or property self.config ?? + self.data.setConfigData(self._config) + + # load each segmentation as item + for i, s in enumerate(self._config["segmentAttributes"]): + assert len(s) == 1 + config = s[0] + labeID = int(config["labelID"]) + + f = os.path.join(output_dir, f"pydcmqi-{i+1}.nii.gz") + + # create new segment + segment = self.new_segment() + segment.setFile(f, labeID, disable_file_sanity_checks) + segment.config = config + + # update state + self.loaded = True + + # store file paths + self.files._config = config_file + + def write( + self, + output_file: Union[str, Path], + dicom_dir: Union[str, Path], + export_config_to_file: Optional[Union[str, Path]] = None, + allow_overwrite: bool = False, + ) -> None: + # make sure the output file is a Path object + output_file = _path(output_file) + dicom_dir = _path(dicom_dir) + + if export_config_to_file is not None: + export_config_to_file = _path(export_config_to_file) + + # check output file + if not output_file.name.endswith(".seg.dcm"): + raise ValueError("Output file must end with .seg.dcm.") + + # check if the file already exists and if overwriting protection is disabled + if output_file.exists() and not allow_overwrite: + raise FileExistsError(f"Output file already exists: {output_file}.") + + # check that the dicom directory exists and contains at least one *.dcm file + if not dicom_dir.exists(): + raise FileNotFoundError(f"Directory does not exist: {dicom_dir}.") + + if not dicom_dir.is_dir(): + raise ValueError(f"Path is not a directory: {dicom_dir}.") + + # check that the dicom directory contains at least one *.dcm file + if len(list(dicom_dir.glob("*.dcm"))) == 0: + raise ValueError( + f"Directory does not contain any DICOM files: {dicom_dir}." + ) + + # get config + config = self.config + files = self.segmentation_files + + # store in the output directory + # but for now just print + print(json.dumps(config, indent=2)) + + # store in _debug_test_meta.json + meta_tmp_file = Path(self.tmp_dir) / "_debug_test_meta.json" + with open(meta_tmp_file, "w") as f: + json.dump(config, f, indent=2) + + # export config file if requested + if export_config_to_file is not None: + shutil.copy(meta_tmp_file, export_config_to_file) + + # construct dcmqi cli command + cmd = [ + "itkimage2segimage", + "--inputImageList", + ",".join(str(fp) for fp in files), + "--inputDICOMDirectory", + str(dicom_dir), + "--outputDICOM", + str(output_file), + "--inputMetadata", + str(meta_tmp_file), + ] + + # run command + subprocess.run(cmd, check=True) + + def getExportedConfiguration(self) -> dict: + assert self.loaded + return self._config + + @property + def config(self) -> SegImageDict: + # generate the config file from the segments + # NOTE: returns a copy, not a reference to the data json dict + # NOTE: equivalent to self.data.getConfigData() + config = self.data.asdict() + + # make sure all segments have a file specified + for s in self._segments: + if s.path is None: + raise ValueError(f"Segment {s} has no file specified.") + + # sort segments by files + f2s: Dict[str, List[Segment]] = {} + for s in self._segments: + p = str(s.path) + if p not in f2s: + f2s[p] = [] + f2s[p].append(s) + + # sort the segments by their labelID + f2s = {k: sorted(v, key=lambda x: x.labelID) for k, v in f2s.items()} + + # order the dicionary by it's keys + of2s = OrderedDict(sorted(f2s.items())) + + # check that for all files + # - no duplicate labelIDs are present + # - all labelIDs are continuous and start at 1 + for f, s in of2s.items(): + labelIDs = [x.labelID for x in s] + if len(labelIDs) != len(set(labelIDs)): + raise ValueError(f"Duplicate labelIDs found in {f}.") + # if min(labelIDs) != 1: + # raise ValueError(f"LabelIDs must start at 1 in {f}.") + # if max(labelIDs) != len(labelIDs): + # raise ValueError(f"LabelIDs must be continuous in {f}.") + + # add each segment to the config + config["segmentAttributes"] = [[s.config for s in ss] for ss in of2s.values()] + + # return the generated config + return config + + @property + def segmentation_files(self) -> List[Path]: + return sorted(set([s.path for s in self._segments])) + + @config.setter + def config(self, config: SegImageDict) -> None: + self.data.setConfigData(config) + + @property + def segments(self) -> List[Segment]: + return self._segments + + def add_segment(self, segment: Segment) -> None: + self._segments.append(segment) + + def new_segment(self) -> Segment: + segment = Segment() + self._segments.append(segment) + return segment diff --git a/src/pydcmqi/types.py b/src/pydcmqi/types.py new file mode 100644 index 0000000..1ee6b4c --- /dev/null +++ b/src/pydcmqi/types.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from typing import List, TypedDict + + +class TripletDict(TypedDict): + CodeMeaning: str + CodeValue: str + CodingSchemeDesignator: str + + +class SegmentDict(TypedDict): + labelID: int + SegmentLabel: str + SegmentDescription: str + SegmentAlgorithmName: str + SegmentAlgorithmType: str + recommendedDisplayRGBValue: List[int] + SegmentedPropertyCategoryCodeSequence: TripletDict + SegmentedPropertyTypeCodeSequence: TripletDict + + +class SegImageDict(TypedDict): + BodyPartExamined: str + ClinicalTrialCoordinatingCenterName: str + ClinicalTrialSeriesID: str + ClinicalTrialTimePointID: str + ContentCreatorName: str + InstanceNumber: str + SeriesDescription: str + SeriesNumber: str + segmentAttributes: List[List[SegmentDict]] diff --git a/tests/test_segimage.py b/tests/test_segimage.py new file mode 100644 index 0000000..9e7a233 --- /dev/null +++ b/tests/test_segimage.py @@ -0,0 +1,551 @@ +from __future__ import annotations + +import json +import os +from pathlib import Path + +import pytest +from idc_index import index + +from pydcmqi.segimage import SegImage, SegmentData, Triplet + +TEST_DIR = Path(os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_data")) + +# force loading of the segmentation data +# NOTE: only set to false to speed up manual testing. +FORCE_LOADING = True + + +# helper function to sort dictionaries +# ONLY FOR FILE EXPORT +# DICTS ARE NOT PERSISTENTLY ORDERED IN PYTHON +def _iterative_dict_sort(d): + if isinstance(d, list): + return [_iterative_dict_sort(v) for v in d] + elif isinstance(d, dict): + return {k: _iterative_dict_sort(v) for k, v in sorted(d.items())} + else: + return d + + +class TestTriplets: + def test_triplet_from_tuple(self): + d = ("Liver", "123037004", "SCT") + t = Triplet.fromTuple(d) + + assert t.label == "Liver" + assert t.code == "123037004" + assert t.scheme == "SCT" + assert t.valid == True + + def test_triplet_from_dict(self): + d = { + "CodeMeaning": "Anatomical Structure", + "CodeValue": "123037004", + "CodingSchemeDesignator": "SCT", + } + t = Triplet.fromDict(d) + + assert t.label == "Anatomical Structure" + assert t.code == "123037004" + assert t.scheme == "SCT" + assert t.valid == True + + def test_triplet_initializer(self): + t = Triplet("Anatomical Structure", "123037004", "SCT") + + assert t.label == "Anatomical Structure" + assert t.code == "123037004" + assert t.scheme == "SCT" + assert t.valid == True + + def test_empty_triplet(self): + t = Triplet.empty() + + assert t.label == "" + assert t.code == "" + assert t.scheme == "" + assert t.valid == False + + +class TestSegmentData: + def test_triplet_property_from_tuple(self): + d = SegmentData() + d.segmentedPropertyCategory = ("Anatomical Structure", "123037004", "SCT") + + assert isinstance(d.segmentedPropertyCategory, Triplet) + assert d.segmentedPropertyCategory.label == "Anatomical Structure" + assert d.segmentedPropertyCategory.code == "123037004" + assert d.segmentedPropertyCategory.scheme == "SCT" + assert ( + d.data["SegmentedPropertyCategoryCodeSequence"] + == d.segmentedPropertyCategory.asdict() + ) + + def test_triplet_property_from_object(self): + d = SegmentData() + d.segmentedPropertyCategory = Triplet( + "Anatomical Structure", "123037004", "SCT" + ) + + assert isinstance(d.segmentedPropertyCategory, Triplet) + assert d.segmentedPropertyCategory.label == "Anatomical Structure" + assert d.segmentedPropertyCategory.code == "123037004" + assert d.segmentedPropertyCategory.scheme == "SCT" + assert ( + d.data["SegmentedPropertyCategoryCodeSequence"] + == d.segmentedPropertyCategory.asdict() + ) + + def test_triplet_property_from_properties(self): + d = SegmentData() + d.segmentedPropertyCategory.label = "Anatomical Structure" + d.segmentedPropertyCategory.code = "123037004" + d.segmentedPropertyCategory.scheme = "SCT" + + assert isinstance(d.segmentedPropertyCategory, Triplet) + assert d.segmentedPropertyCategory.label == "Anatomical Structure" + assert d.segmentedPropertyCategory.code == "123037004" + assert d.segmentedPropertyCategory.scheme == "SCT" + assert ( + d.data["SegmentedPropertyCategoryCodeSequence"] + == d.segmentedPropertyCategory.asdict() + ) + + +class TestSegImageClass: + ### SETUP + def setup_method(self): + SegImage.reset() + + ### verbosity + def test_verbosity_default(self): + # initialize + segimg = SegImage() + + # verbose mode is disabled by default + assert SegImage.verbose == False + assert segimg.verbose == False + + def test_verbosity_default_global_override(self): + # set globally to true + SegImage.verbose = True + + # initialize + segimg1 = SegImage() + segimg2 = SegImage(verbose=False) + + # + assert SegImage.verbose == True + assert segimg1.verbose == True + assert segimg2.verbose == False + + def test_verbosity_instance_override(self): + # initialize + segimg = SegImage(verbose=True) + + # + assert SegImage.verbose == False + assert segimg.verbose == True + + ### tmp_dir + def test_tmp_dir(self): + # initialize + segimg1 = SegImage() + segimg2 = SegImage(tmp_dir=None) + segimg3 = SegImage(tmp_dir="tmp") + segimg4 = SegImage(tmp_dir=Path("tmp")) + + # all instances will have a temp dir set + assert segimg1.tmp_dir != None + assert segimg2.tmp_dir != None + assert segimg3.tmp_dir != None + assert segimg4.tmp_dir != None + + # the type of the tmp_dir is Path, no matter how it was initilaized + assert isinstance(segimg1.tmp_dir, Path) + assert isinstance(segimg2.tmp_dir, Path) + assert isinstance(segimg3.tmp_dir, Path) + assert isinstance(segimg4.tmp_dir, Path) + + # specifying a tmp_dir with e.g. a numeric type will raise an ValueError + with pytest.raises(ValueError): + _ = SegImage(tmp_dir=1) + + +class TestSegimageRead: + # set-up download data from idc + def setup_class(self): + # define output directory + self.tmp_dir = TEST_DIR / "tmp" + self.img_dir = TEST_DIR / "image" + self.seg_dir = TEST_DIR / "seg" + self.out_dir = TEST_DIR / "out" + + # create directories + self.tmp_dir.mkdir(parents=True, exist_ok=True) + self.img_dir.mkdir(parents=True, exist_ok=True) + self.seg_dir.mkdir(parents=True, exist_ok=True) + self.out_dir.mkdir(parents=True, exist_ok=True) + + # initialize a SegImage instance used in multiple tests + self.segimg = SegImage(self.tmp_dir) + + # initialize idc index client + client = index.IDCClient() + + # download a specific sid from idc + # CT image: 1.3.6.1.4.1.14519.5.2.1.8421.4008.761093011533106086639756339870 + # LIVER SEG: 1.2.276.0.7230010.3.1.3.17436516.270878.1696966588.943239 + + # PT image (6): 1.3.6.1.4.1.14519.5.2.1.4334.1501.680033973739971488930649469577 + # LUNG+TUMOR SEG (300): 1.2.276.0.7230010.3.1.3.17436516.538020.1696968975.507837 + + if not [f for f in os.listdir(self.img_dir) if f.endswith(".dcm")]: + print("downloading image") + client.download_dicom_series( + "1.3.6.1.4.1.14519.5.2.1.4334.1501.680033973739971488930649469577", + self.img_dir, + quiet=False, + ) + + if not [f for f in os.listdir(self.seg_dir) if f.endswith(".dcm")]: + print("downloading segment") + client.download_dicom_series( + "1.2.276.0.7230010.3.1.3.17436516.538020.1696968975.507837", + self.seg_dir, + quiet=False, + ) + + # check download and assign files + sf = [f for f in os.listdir(self.seg_dir) if f.endswith(".dcm")] + assert len(sf) == 1 + self.seg_file = os.path.join(self.seg_dir, sf[0]) + + def setup_method(self): + # rezet segimage + SegImage.reset() + + # load segmentation + if not self.segimg.loaded: + if not FORCE_LOADING and (self.out_dir / "pydcmqi-meta.json").exists(): + print("======= SKIPPING LOAD AND IMPORT FROM CACHE ========") + self.segimg._import(self.out_dir, disable_file_sanity_checks=True) + self.segimg.files._dicomseg = self.seg_file # fix + else: + self.segimg.load(self.seg_file, output_dir=self.out_dir) + + def test_loading(self): + assert self.segimg.loaded == True + + # files + assert self.segimg.files.config != None + assert self.segimg.files.config == self.out_dir / "pydcmqi-meta.json" + assert self.segimg.files.dicomseg != None + assert self.segimg.files.dicomseg == self.seg_file + + # config + assert self.segimg._config != None + assert self.segimg.getExportedConfiguration() != None + + def test_loaded_segment_content(self): + # check + segments = list(self.segimg.segments) + assert len(segments) == 2 + + # check segment content + for segment in segments: + # get path + print("item path:", segment.path) + + # compoare numpy shape with itk size + assert segment.numpy.transpose(2, 1, 0).shape == segment.itk.GetSize() + + # check the file is bindary + assert segment.isLabel(segment.labelID) + + # check the binary mask + import numpy as np + + assert segment.binary.shape == segment.numpy.shape + assert segment.binary.dtype == bool + assert np.array_equal(segment.binary, (segment.numpy == segment.labelID)) + assert segment.binary.min() == 0 + assert segment.binary.max() == 1 + + def test_loaded_segment_config(self): + # get the raw config + config = self.segimg.getExportedConfiguration() + + # check config matching per segment + for i, segment_config in enumerate(config["segmentAttributes"]): + segment = self.segimg.segments[i] + + # the extracted, generated and validated segment config is identical with the + # segment configuration from the original config file + assert segment.config == segment_config[0] + + def test_loaded_segment_data(self): + # get the raw config + # NOTE: this is the json from the original exported config file + config = self.segimg.getExportedConfiguration() + + # iterate all imported segments and compare the extracted data with + # the original json config + attrs = config["segmentAttributes"] + for i, segment in enumerate(self.segimg.segments): + attr = attrs[i][0] + + # check mandtory fields + assert segment.data.label == attr["SegmentLabel"] + assert segment.data.description == attr["SegmentDescription"] + assert segment.data.rgb == tuple(attr["recommendedDisplayRGBValue"]) + assert segment.data.labelID == attr["labelID"] + assert segment.data.segmentAlgorithmName == attr["SegmentAlgorithmName"] + assert segment.data.segmentAlgorithmType == attr["SegmentAlgorithmType"] + assert ( + str(segment.data.segmentedPropertyCategory) + == attr["SegmentedPropertyCategoryCodeSequence"]["CodeMeaning"] + ) + assert ( + segment.data.segmentedPropertyCategory.label + == attr["SegmentedPropertyCategoryCodeSequence"]["CodeMeaning"] + ) + assert ( + segment.data.segmentedPropertyCategory.code + == attr["SegmentedPropertyCategoryCodeSequence"]["CodeValue"] + ) + assert ( + segment.data.segmentedPropertyCategory.scheme + == attr["SegmentedPropertyCategoryCodeSequence"][ + "CodingSchemeDesignator" + ] + ) + assert ( + str(segment.data.segmentedPropertyType) + == attr["SegmentedPropertyTypeCodeSequence"]["CodeMeaning"] + ) + assert ( + segment.data.segmentedPropertyType.label + == attr["SegmentedPropertyTypeCodeSequence"]["CodeMeaning"] + ) + assert ( + segment.data.segmentedPropertyType.code + == attr["SegmentedPropertyTypeCodeSequence"]["CodeValue"] + ) + assert ( + segment.data.segmentedPropertyType.scheme + == attr["SegmentedPropertyTypeCodeSequence"]["CodingSchemeDesignator"] + ) + + # check optional fields + assert segment.data.hasSegmentedPropertyTypeModifier == ( + "SegmentedPropertyTypeModifierCodeSequence" in attr + ) + assert segment.data.hasAnatomicRegion == ( + "AnatomicRegionModifierSequence" in attr + ) + assert segment.data.hasAnatomicRegionModifier == ( + "AnatomicRegionModifierSequence" in attr + ) + + if segment.data.hasSegmentedPropertyTypeModifier: + # NOTE: why ...CodeSequence and ...Sequence ? + assert ( + str(segment.data.segmentedPropertyTypeModifier) + == attr["SegmentedPropertyTypeModifierCodeSequence"]["CodeMeaning"] + ) + assert ( + segment.data.segmentedPropertyTypeModifier.label + == attr["SegmentedPropertyTypeModifierCodeSequence"]["CodeMeaning"] + ) + assert ( + segment.data.segmentedPropertyTypeModifier.code + == attr["SegmentedPropertyTypeModifierCodeSequence"]["CodeValue"] + ) + assert ( + segment.data.segmentedPropertyTypeModifier.scheme + == attr["SegmentedPropertyTypeModifierCodeSequence"][ + "CodingSchemeDesignator" + ] + ) + + if segment.data.hasAnatomicRegion: + assert ( + str(segment.data.anatomicRegion) + == attr["AnatomicRegionSequence"]["CodeMeaning"] + ) + assert ( + segment.data.anatomicRegion.label + == attr["AnatomicRegionSequence"]["CodeMeaning"] + ) + assert ( + segment.data.anatomicRegion.code + == attr["AnatomicRegionSequence"]["CodeValue"] + ) + assert ( + segment.data.anatomicRegion.scheme + == attr["AnatomicRegionSequence"]["CodingSchemeDesignator"] + ) + + if segment.data.hasAnatomicRegionModifier: + assert ( + str(segment.data.anatomicRegionModifier) + == attr["AnatomicRegionModifierSequence"]["CodeMeaning"] + ) + assert ( + segment.data.anatomicRegionModifier.label + == attr["AnatomicRegionModifierSequence"]["CodeMeaning"] + ) + assert ( + segment.data.anatomicRegionModifier.code + == attr["AnatomicRegionModifierSequence"]["CodeValue"] + ) + assert ( + segment.data.anatomicRegionModifier.scheme + == attr["AnatomicRegionModifierSequence"]["CodingSchemeDesignator"] + ) + + def test_loaded_config(self): + # get the raw config + config = self.segimg.getExportedConfiguration() + + # check the config + assert self.segimg.config == config + + def test_loaded_data(self): + # get the raw config + config = self.segimg.getExportedConfiguration() + + # check the data + assert self.segimg.data.bodyPartExamined == config["BodyPartExamined"] + assert ( + self.segimg.data.clinicalTrialCoordinatingCenterName + == config["ClinicalTrialCoordinatingCenterName"] + ) + assert self.segimg.data.clinicalTrialSeriesID == config["ClinicalTrialSeriesID"] + assert ( + self.segimg.data.clinicalTrialTimePointID + == config["ClinicalTrialTimePointID"] + ) + assert self.segimg.data.contentCreatorName == config["ContentCreatorName"] + assert self.segimg.data.instanceNumber == config["InstanceNumber"] + assert self.segimg.data.seriesDescription == config["SeriesDescription"] + assert self.segimg.data.seriesNumber == config["SeriesNumber"] + + +class TestSegimageWrite: + def setup_class(self): + # define output directory + self.tmp_dir = TEST_DIR / "tmp" + self.img_dir = TEST_DIR / "image" + self.seg_dir = TEST_DIR / "seg" + self.out_dir = TEST_DIR / "out" + + # create directories + self.tmp_dir.mkdir(parents=True, exist_ok=True) + self.img_dir.mkdir(parents=True, exist_ok=True) + self.seg_dir.mkdir(parents=True, exist_ok=True) + self.out_dir.mkdir(parents=True, exist_ok=True) + + # initialize idc index client + client = index.IDCClient() + + if not [f for f in os.listdir(self.img_dir) if f.endswith(".dcm")]: + print("downloading image") + client.download_dicom_series( + "1.3.6.1.4.1.14519.5.2.1.4334.1501.680033973739971488930649469577", + self.img_dir, + quiet=False, + ) + + if not [f for f in os.listdir(self.seg_dir) if f.endswith(".dcm")]: + print("downloading segment") + client.download_dicom_series( + "1.2.276.0.7230010.3.1.3.17436516.538020.1696968975.507837", + self.seg_dir, + quiet=False, + ) + + # check download and assign files + sf = [f for f in os.listdir(self.seg_dir) if f.endswith(".dcm")] + assert len(sf) == 1 + self.seg_file = os.path.join(self.seg_dir, sf[0]) + + # get the segmentation files + self.dseg_config_file = self.out_dir / "pydcmqi-meta.json" + self.lung_seg_file = self.out_dir / "pydcmqi-1.nii.gz" + self.tumor_seg_file = self.out_dir / "pydcmqi-2.nii.gz" + + # extract nifit files from segmentation if not already done + if ( + not self.dseg_config_file.exists() + or not self.lung_seg_file.exists() + or not self.tumor_seg_file.exists() + ): + self.segimg.load(self.seg_file, output_dir=self.out_dir) + + def test_write(self): + # NOTE: this test is not yet dynamic and only works with the specified dicomseg file. + + # load original config + with open(self.dseg_config_file) as f: + config = json.load(f) + + # initialize a SegImage instance used in multiple tests + segimg = SegImage(self.tmp_dir) + + # specify segimg data + segimg.data.bodyPartExamined = "LUNG" + segimg.data.clinicalTrialCoordinatingCenterName = "dcmqi" + segimg.data.clinicalTrialSeriesID = "Session1" + segimg.data.clinicalTrialTimePointID = "1" + segimg.data.contentCreatorName = "BAMFHealth^AI" + segimg.data.instanceNumber = "1" + segimg.data.seriesDescription = "AIMI lung and FDG tumor AI segmentation" + segimg.data.seriesNumber = "300" + + # add lung segment + lung = segimg.new_segment() + lung.data.label = "Lung" + lung.data.description = "Lung" + lung.data.rgb = (128, 174, 128) + lung.data.labelID = 1 + lung.data.segmentAlgorithmName = "BAMF-Lung-FDG-PET-CT" + lung.data.segmentAlgorithmType = "AUTOMATIC" + lung.data.segmentedPropertyCategory = ( + "Anatomical Structure", + "123037004", + "SCT", + ) + lung.data.segmentedPropertyType = ("Lung", "39607008", "SCT") + lung.data.segmentedPropertyTypeModifier = ("Right and left", "51440002", "SCT") + + lung.setFile(self.lung_seg_file, labelID=1) + + # add tumor segment + tumor = segimg.new_segment() + tumor.data.label = "FDG-Avid Tumor" + tumor.data.description = "FDG-Avid Tumor" + tumor.data.rgb = (174, 41, 14) + tumor.data.labelID = 2 + tumor.data.segmentAlgorithmName = "BAMF-Lung-FDG-PET-CT" + tumor.data.segmentAlgorithmType = "AUTOMATIC" + tumor.data.segmentedPropertyCategory = ("Radiologic Finding", "C35869", "NCIt") + tumor.data.segmentedPropertyType.label = "FDG-Avid Tumor" + tumor.data.segmentedPropertyType.code = "C168968" + tumor.data.segmentedPropertyType.scheme = "NCIt" + tumor.data.anatomicRegion = lung.data.segmentedPropertyType + tumor.data.anatomicRegionModifier = lung.data.segmentedPropertyTypeModifier + + tumor.setFile(self.tumor_seg_file, labelID=2) + + # check config + assert segimg.config == config + + # write file + output_file = self.out_dir / "test.seg.dcm" + segimg.write(output_file, self.img_dir, allow_overwrite=True) + + # check the file was created + assert output_file.exists()