From ca6fcad956243eeec0ceb77fe1fa0aec6c2ff48e Mon Sep 17 00:00:00 2001 From: LinkunGao Date: Tue, 20 Aug 2024 01:31:23 +1200 Subject: [PATCH 1/2] fix issue #4 #5 --- fhir_cda/ehr/measurement.py | 30 +++++++++++++++++------------- test/test.py | 6 +++--- 2 files changed, 20 insertions(+), 16 deletions(-) diff --git a/fhir_cda/ehr/measurement.py b/fhir_cda/ehr/measurement.py index 558fcfa..d8b2f50 100644 --- a/fhir_cda/ehr/measurement.py +++ b/fhir_cda/ehr/measurement.py @@ -1,36 +1,40 @@ -from typing import Union +from typing import Union, Optional class Measurement: - def __init__(self, value: Union[str, int, float], code: str, units: str, code_system="http://loinc.org", - units_system="http://unitsofmeasure.org"): + def __init__(self, value: Union[str, int, float], code: str, unit: str, code_system="http://loinc.org", + unit_system="http://unitsofmeasure.org", display: Optional[str] = None): if not isinstance(value, (str, int, float)) or isinstance(value, bool): raise ValueError(f"value={value} is not an instance of any of the types in the tuple (str, int, float)") elif not isinstance(code, str): raise ValueError(f"code={code} is not an instance of type str") - elif not isinstance(units, str): - raise ValueError(f"units={units} is not an instance of type str") + elif not isinstance(unit, str): + raise ValueError(f"units={unit} is not an instance of type str") elif not isinstance(code_system, str): raise ValueError(f"value_system={code_system} is not an instance of type str") - elif not isinstance(units_system, str): - raise ValueError(f"units_system={units_system} is not an instance of type str") + elif not isinstance(unit_system, str): + raise ValueError(f"units_system={unit_system} is not an instance of type str") + elif display is not None and not isinstance(unit_system, str): + raise ValueError(f"display={display} is not an instance of type str") self.value = value self.code = code - self.units = units + self.unit = unit self.code_system = code_system - self.units_system = units_system + self.unit_system = unit_system + self.display = display def __repr__(self): - return (f"Measurement(value={self.value}, code='{self.code}', units='{self.units}', " - f"value_system='{self.code_system}', units_system='{self.units_system}')") + return (f"Measurement(value={self.value}, code='{self.code}', unit='{self.unit}', " + f"value_system='{self.code_system}', unit_system='{self.unit_system}')") def get(self): return { "value": self.value, "code": self.code, - "units": self.units, + "unit": self.unit, "codeSystem": self.code_system, - "unitsSystem": self.units_system + "unitSystem": self.unit_system, + "display": self.display if isinstance(self.display, str) else "" } diff --git a/test/test.py b/test/test.py index 3dad7df..74d1d02 100644 --- a/test/test.py +++ b/test/test.py @@ -12,12 +12,12 @@ def test_annotator(self): annotator = Annotator("./dataset/dataset-sparc") # - m = Measurement(value="0.15", code="21889-1", units="cm") + m = Measurement(value="0.15", code="21889-1", unit="cm") annotator.add_measurements(["sub-001"], [m]).save() - annotator.add_measurements("sub-002", Measurement(value="25", code="30525-0", units="year", code_system="http://loinc.org")) + annotator.add_measurements("sub-002", Measurement(value="25", code="30525-0", unit="year", code_system="http://loinc.org")) - annotator.add_measurements(["sub-001", "sub-002"], Measurement(value="170", code="8302-2", units="cm")) + annotator.add_measurements(["sub-001", "sub-002"], Measurement(value="170", code="8302-2", unit="cm")) annotator.save() From 7daa45dfbdf7a8b1b97e2a3619618f5606b7f1ce Mon Sep 17 00:00:00 2001 From: LinkunGao Date: Wed, 21 Aug 2024 16:10:34 +1200 Subject: [PATCH 2/2] Huge update on observation value format --- README.md | 26 ++++-- fhir_cda/ehr/__init__.py | 1 + fhir_cda/ehr/elements.py | 172 ++++++++++++++++++++++++++++++++++++ fhir_cda/ehr/measurement.py | 31 +++---- test/test.py | 14 +-- 5 files changed, 212 insertions(+), 32 deletions(-) create mode 100644 fhir_cda/ehr/elements.py diff --git a/README.md b/README.md index 0fd61c8..e6eadb4 100644 --- a/README.md +++ b/README.md @@ -7,31 +7,43 @@ - Add measurement for one patient ```py from fhir_cda import Annotator -from fhir_cda.ehr import Measurement +from fhir_cda.ehr import Measurement, ObservationValue, Quantity annotator = Annotator("./dataset/dataset-sparc") -m = Measurement(value="0.15", code="21889-1", units="cm") +m = Measurement(value=ObservationValue(value_quantity=Quantity(value=30, unit="year", code="a")), + code="30525-0") + annotator.add_measurements("sub-001", m).save() ``` - Add measurements for one patient ```py -m1 = Measurement(value="0.15", code="21889-1", units="cm") -m2 = Measurement(value="0.15", code="21889-1", units="cm", code_system="http://loinc.org", units_system="http://unitsofmeasure.org") +m1 = Measurement(value=ObservationValue(value_quantity=Quantity(value=0.15, unit="cm", code="cm")), + code="21889-1") +m2 = Measurement(value=ObservationValue(value_quantity=Quantity(value=0.15, unit="cm", code="cm", system="http://unitsofmeasure.org")), + code="21889-1", code_system="http://loinc.org", display="Size Tumor") annotator.add_measurements("sub-001", [m1, m2]).save() ``` - Add measurement for multiple patients ```py -m = Measurement(value="0.15", code="21889-1", units="cm") +m = Measurement(value=ObservationValue(value_string="Female"), + code="99502-7", display="Recorded sex or gender", code_system="http://loinc.org") annotator.add_measurements(["sub-001", "sub-002"], m).save() ``` - A measurements for multiple patients ```py -m1 = Measurement(value="0.15", code="21889-1", units="cm") -m2 = Measurement(value="0.15", code="21889-1", units="cm", code_system="http://loinc.org", units_system="http://unitsofmeasure.org") +m1 = Measurement(value=ObservationValue(value_string="Female"), + code="99502-7", display="Recorded sex or gender", code_system="http://loinc.org") +m2 = Measurement(value=ObservationValue(value_quantity=Quantity(value=0.15, unit="cm", code="cm", system="http://unitsofmeasure.org")), + code="21889-1", code_system="http://loinc.org", display="Size Tumor") annotator.add_measurements(["sub-001", "sub-002"], [m1, m2]) annotator.save() +``` +- Notice: The default value for `unit system` and `code system` are: +```python +unit_system = "http://unitsofmeasure.org" +code_system = "http://loinc.org" ``` \ No newline at end of file diff --git a/fhir_cda/ehr/__init__.py b/fhir_cda/ehr/__init__.py index cb1c9b7..5cc9c65 100644 --- a/fhir_cda/ehr/__init__.py +++ b/fhir_cda/ehr/__init__.py @@ -1 +1,2 @@ from .measurement import Measurement +from .elements import ObservationValue, Coding, CodeableConcept, Range, Ratio, Period, SampledData, Quantity diff --git a/fhir_cda/ehr/elements.py b/fhir_cda/ehr/elements.py new file mode 100644 index 0000000..4617b34 --- /dev/null +++ b/fhir_cda/ehr/elements.py @@ -0,0 +1,172 @@ +from typing import Optional, Literal, List + + +class Quantity: + def __init__(self, value: Optional[float] = None, comparator: Optional[Literal["<", "<=", ">=", ">"]] = None, + unit: Optional[str] = None, system: Optional[str] = "http://unitsofmeasure.org", + code: Optional[str] = None): + """ + :param value: Numerical value (with implicit precision) for Observation + :param comparator: < | <= | >= | > - how to understand the value + :param unit: Unit representation + :param system: System that defines coded unit form + :param code: Coded form of the unit + """ + self.value = value + self.comparator = comparator + self.unit = unit + self.system = system + self.code = code + + def get(self): + quantity = { + "value": self.value if isinstance(self.value, float) or isinstance(self.value, int) else None, + "comparator": self.comparator if self.comparator in ["<", "<=", ">=", ">"] else None, + "unit": self.unit if isinstance(self.unit, str) else None, + "system": self.system if isinstance(self.system, str) else None, + "code": self.code if isinstance(self.code, str) else None + } + return {k: v for k, v in quantity.items() if v not in ("", None)} + + +class Coding: + + def __init__(self, system: str = "", version: str = "", code: str = "", display: Optional[str] = None, + user_selected: Optional[bool] = None): + self.system = system + self.version = version + self.code = code + self.display = display + self.user_selected = user_selected + + def get(self): + coding = { + "system": self.system if isinstance(self.system, str) else None, + "version": self.version if isinstance(self.version, str) else None, + "code": self.code if isinstance(self.code, str) else None, + "display": self.display if isinstance(self.display, str) else None, + "userSelected": self.user_selected if isinstance(self.user_selected, bool) else None + } + return {k: v for k, v in coding.items() if v not in ("", None)} + + +class CodeableConcept: + + def __init__(self, codings: List[Coding] = None, text: str = ""): + self.codings = codings + self.text = text + + def get(self): + codeableconcept = { + "coding": [coding.get() for coding in self.codings if isinstance(coding, Coding)] if isinstance( + self.codings, list) else None, + "text": self.text if isinstance(self.text, str) else None + } + + return {k: v for k, v in codeableconcept.items() if v not in ("", None, [])} + + +class Range: + + def __init__(self, low: Optional[float] = None, high: Optional[float] = None): + self.low = low + self.high = high + + def get(self): + _range = { + "low": self.low if isinstance(self.low, float) else None, + "high": self.high if isinstance(self.high, float) else None + } + return {k: v for k, v in _range.items() if v not in ("", None)} + + +class Ratio: + + def __init__(self, numerator: Optional[Quantity] = None, denominator: Optional[Quantity] = None): + self.numerator = numerator + self.denominator = denominator + + def get(self): + ratio = { + "numerator": self.numerator.get() if isinstance(self.numerator, Quantity) else None, + "denominator": self.denominator.get() if isinstance(self.denominator, Quantity) else None + } + return {k: v for k, v in ratio.items() if v not in ("", None)} + + +class SampledData: + + def __init__(self, origin: str, period: float, dimensions: int, factor: Optional[float] = None, + lower_limit: Optional[float] = None, upper_limit: Optional[float] = None, data: Optional[str] = None): + self.origin = origin + self.period = period + self.dimensions = dimensions + self.factor = factor + self.lower_limit = lower_limit + self.upper_limit = upper_limit + self.data = data + + def get(self): + sampled_data = { + "origin": self.origin if isinstance(self.origin, str) else None, + "period": self.period if isinstance(self.period, float) else None, + "factor": self.factor if isinstance(self.factor, float) else None, + "lowerLimit": self.lower_limit if isinstance(self.lower_limit, float) else None, + "upperLimit": self.upper_limit if isinstance(self.upper_limit, float) else None, + "dimensions": self.dimensions if isinstance(self.dimensions, int) and self.dimensions > 0 else None, + "data": self.data if isinstance(self.data, str) else None + } + return {k: v for k, v in sampled_data.items() if v not in ("", None)} + + +class Period: + + def __init__(self, start: str = '', end: str = ''): + self.start = start + self.end = end + + def get(self): + period = { + "start": self.start if isinstance(self.start, str) else None, + "end": self.end if isinstance(self.end, str) else None + } + return {k: v for k, v in period.items() if v not in ("", None)} + + +class ObservationValue: + + def __init__(self, value_quantity: Optional[Quantity] = None, + value_codeable_concept: Optional[CodeableConcept] = None, value_string: Optional[str] = None, + value_boolean: Optional[bool] = None, value_integer: Optional[int] = None, + value_range: Optional[Range] = None, value_ratio: Optional[Ratio] = None, + value_sampled_data: Optional[SampledData] = None, value_time: Optional[str] = None, + value_date_time: Optional[str] = None, value_period: Optional[Period] = None): + self.value_quantity = value_quantity + self.value_codeable_concept = value_codeable_concept + self.value_string = value_string + self.value_boolean = value_boolean + self.value_integer = value_integer + self.value_range = value_range + self.value_ratio = value_ratio + self.value_sampled_data = value_sampled_data + self.value_time = value_time + self.value_date_time = value_date_time + self.value_period = value_period + + def get(self): + value = { + "valueQuantity": self.value_quantity.get() if isinstance(self.value_quantity, Quantity) else None, + "valueCodeableConcept": self.value_codeable_concept.get() if isinstance(self.value_codeable_concept, + CodeableConcept) else None, + "valueString": self.value_string if isinstance(self.value_string, str) else None, + "valueBoolean": self.value_boolean if isinstance(self.value_boolean, bool) else None, + "valueInteger": self.value_integer if isinstance(self.value_integer, int) else None, + "valueRange": self.value_range.get() if isinstance(self.value_range, Range) else None, + "valueRatio": self.value_ratio.get() if isinstance(self.value_ratio, Ratio) else None, + "valueSampledData": self.value_sampled_data.get() if isinstance(self.value_sampled_data, + SampledData) else None, + "valueTime": self.value_time if isinstance(self.value_time, str) else None, + "valueDateTime": self.value_date_time if isinstance(self.value_date_time, str) else None, + "valuePeriod": self.value_period.get() if isinstance(self.value_period, Period) else None + } + return {k: v for k, v in value.items() if v not in ("", None)} diff --git a/fhir_cda/ehr/measurement.py b/fhir_cda/ehr/measurement.py index d8b2f50..9799609 100644 --- a/fhir_cda/ehr/measurement.py +++ b/fhir_cda/ehr/measurement.py @@ -1,40 +1,31 @@ -from typing import Union, Optional +from typing import Optional +from .elements import ObservationValue class Measurement: - def __init__(self, value: Union[str, int, float], code: str, unit: str, code_system="http://loinc.org", - unit_system="http://unitsofmeasure.org", display: Optional[str] = None): + def __init__(self, value: ObservationValue, code: str, code_system="http://loinc.org", + display: Optional[str] = None): - if not isinstance(value, (str, int, float)) or isinstance(value, bool): - raise ValueError(f"value={value} is not an instance of any of the types in the tuple (str, int, float)") + if not isinstance(value, ObservationValue): + raise ValueError(f"value={value} is not an ObservationValue type") elif not isinstance(code, str): raise ValueError(f"code={code} is not an instance of type str") - elif not isinstance(unit, str): - raise ValueError(f"units={unit} is not an instance of type str") - elif not isinstance(code_system, str): - raise ValueError(f"value_system={code_system} is not an instance of type str") - elif not isinstance(unit_system, str): - raise ValueError(f"units_system={unit_system} is not an instance of type str") - elif display is not None and not isinstance(unit_system, str): + elif display is not None and not isinstance(display, str): raise ValueError(f"display={display} is not an instance of type str") self.value = value self.code = code - self.unit = unit self.code_system = code_system - self.unit_system = unit_system self.display = display def __repr__(self): - return (f"Measurement(value={self.value}, code='{self.code}', unit='{self.unit}', " - f"value_system='{self.code_system}', unit_system='{self.unit_system}')") + return (f"Measurement(value={self.value}, code='{self.code}', value_system='{self.code_system}')") def get(self): - return { - "value": self.value, + measurement = { + "value": self.value.get(), "code": self.code, - "unit": self.unit, "codeSystem": self.code_system, - "unitSystem": self.unit_system, "display": self.display if isinstance(self.display, str) else "" } + return {k: v for k, v in measurement.items() if v not in ("", None)} diff --git a/test/test.py b/test/test.py index 74d1d02..7a66b0e 100644 --- a/test/test.py +++ b/test/test.py @@ -1,5 +1,5 @@ from fhir_cda import Annotator -from fhir_cda.ehr import Measurement +from fhir_cda.ehr import Measurement, ObservationValue, Quantity from typing import Union from pprint import pprint import time @@ -12,12 +12,17 @@ def test_annotator(self): annotator = Annotator("./dataset/dataset-sparc") # - m = Measurement(value="0.15", code="21889-1", unit="cm") + m = Measurement(value=ObservationValue(value_quantity=Quantity(value=30, unit="year", code="a")), + code="30525-0") annotator.add_measurements(["sub-001"], [m]).save() - annotator.add_measurements("sub-002", Measurement(value="25", code="30525-0", unit="year", code_system="http://loinc.org")) + annotator.add_measurements("sub-002", Measurement(value=ObservationValue(value_string="M"), code="99502-7", + code_system="https://loinc.org", + display="Recorded sex or gender")) - annotator.add_measurements(["sub-001", "sub-002"], Measurement(value="170", code="8302-2", unit="cm")) + annotator.add_measurements(["sub-001", "sub-002"], Measurement( + value=ObservationValue(value_quantity=Quantity(value=175, unit="cm", code="cm")), code="8302-2", + display="Body height")) annotator.save() @@ -26,7 +31,6 @@ def test_annotator(self): print(f"Function took {elapsed_time:.4f} seconds to complete.") - if __name__ == '__main__': test = Test() test.test_annotator()