From 3d223e81003a38e51d821a8eee5d8ac38ba4e558 Mon Sep 17 00:00:00 2001 From: "jasmin.ziegler" Date: Wed, 4 Oct 2023 19:41:04 +0200 Subject: [PATCH] feat: updated adtfhir-to-opal to produce uicc mapping, grouped uicc, age at diagnosis (#107) * feat: adapt uicc mapping, grouped uicc, age@diagn * make megalinter happy * fix: undo local changes * fix: use os.path.join instead of string concat. --------- Co-authored-by: Ziegler, Jasmin --- src/adtfhir_to_opal/adtfhir_to_opal.py | 197 ++++++++++++++++--------- src/adtfhir_to_opal/compose.yaml | 2 +- 2 files changed, 131 insertions(+), 68 deletions(-) diff --git a/src/adtfhir_to_opal/adtfhir_to_opal.py b/src/adtfhir_to_opal/adtfhir_to_opal.py index 5216b1e0..a276fc2c 100644 --- a/src/adtfhir_to_opal/adtfhir_to_opal.py +++ b/src/adtfhir_to_opal/adtfhir_to_opal.py @@ -8,18 +8,20 @@ from pathling.etc import find_jar from pydantic import BaseSettings from pyspark.sql import SparkSession -from pyspark.sql.functions import col, explode, first, max, regexp_replace, to_date, udf +from pyspark.sql.functions import col, explode, first, regexp_replace, to_date, udf from pyspark.sql.types import StringType class Settings(BaseSettings): - output_folder: str = "~/opal-output" - kafka_topic_year_suffix: str = ".2023" - kafka_patient_topic: str = "fhir.onkoadt.Patient" - kafka_condition_topic: str = "fhir.onkoadt.Condition" - kafka_observation_topic: str = "fhir.onkoadt.Observation" - kafka_procedure_topic: str = "fhir.onkoadt.Procedure" - kafka_medicationstatement_topic: str = "fhir.onkoadt.MedicationStatement" + output_folder: str = "/opt/bitnami/spark/opal-output" + output_filename: str = "oBDS_tabular" + kafka_topic_year_suffix: str = ".2022" + kafka_patient_topic: str = "fhir.post-gateway-bzkf-onkoadt.Patient" + kafka_condition_topic: str = "fhir.post-gateway-bzkf-onkoadt.Condition" + kafka_observation_topic: str = "fhir.post-gateway-bzkf-onkoadt.Observation" + kafka_procedure_topic: str = "fhir.post-gateway-bzkf-onkoadt.Procedure" + kafka_medicationstatement_topic: str = \ + "fhir.post-gateway-bzkf-onkoadt.MedicationStatement" # ⚠️ make sure these are consistent with the ones downloaded inside the Dockerfile jar_list: list = [ "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2", @@ -113,13 +115,8 @@ def read_data_from_kafka_save_delta(spark: SparkSession, kafka_topics: str): query.processAllAvailable() kafka_data = spark.sql("select * from gettable") kafka_data = kafka_data.select("value") - print( - "Heyhoy settings.output_folder + bundles-delta: ", - settings.output_folder + "/bundles-delta", - ) - kafka_data.write.format("delta").mode("overwrite").save( - settings.output_folder + "/bundles-delta" - ) + bundle_folder = os.path.join(settings.output_folder, "bundles-delta") + kafka_data.write.format("delta").mode("overwrite").save(bundle_folder) def lookup_gender(gender_string): @@ -142,6 +139,24 @@ def calculate_age(birthdate): return age +def calculate_age_at_conditiondate(birthdate, conditiondate): + age_at_conditiondate = conditiondate - birthdate + days_in_year = 365.2425 + age_at_conditiondate = int(age_at_conditiondate.days/days_in_year) + return age_at_conditiondate + + +def add_age_at_condition(df_pat_cond_joined): + calculate_age_at_conditiondateUDF = udf(lambda x, y: + calculate_age_at_conditiondate(x, y), + StringType()) + df_pat_cond_joined = df_pat_cond_joined.withColumn( + "age_at_diagnosis", calculate_age_at_conditiondateUDF( + to_date(df_pat_cond_joined.birthDate), + df_pat_cond_joined.conditiondate)) + return df_pat_cond_joined + + def return_year(deceasedDateTime): if deceasedDateTime is not None: year = deceasedDateTime[0:4] @@ -157,7 +172,8 @@ def encode_patients(ptl: PathlingContext, df_bundles: pyspark.sql.dataframe.Data return_yearUDF = udf(lambda x: return_year(x), StringType()) patients = df_patients.selectExpr( - "id as pat_id", "gender", "birthDate", "deceasedBoolean", "deceasedDateTime" + "id as pat_id", "gender", "birthDate", + "deceasedBoolean", "deceasedDateTime" ) patients = patients.withColumns( @@ -297,34 +313,35 @@ def join_dataframes(df_one, partition_col_one: str, df_two, partition_col_two: s def lookup_obs_value_codingcode_tnm_uicc_mapping(obs_value_codingcode_tnm_UICC): obs_value_codingcode_tnm_uicc_mapping_dict = { - "0": "0", - "0a": "1", - "0is": "2", - "I": "3", - "IA": "4", - "IA1": "5", - "IA2": "6", - "IB": "7", - "IB1": "8", - "IB2": "9", - "IC": "10", - "II": "11", - "IIA": "12", - "IIA1": "13", - "IIA2": "14", - "IIB": "15", - "IIC": "16", - "III": "17", - "IIIA": "18", - "IIIB": "19", - "IIIC": "20", - "IIIC1": "21", - "IIIC2": "22", - "IV": "23", - "IVA": "24", - "IVB": "25", - "IVC": "26", - "IS": "27", + "0": "0", + "0a": "1", + "0is": "2", + "I": "3", + "IA": "4", + "IA1": "5", + "IA2": "6", + "IA3": "7", + "IB": "8", + "IB1": "9", + "IB2": "10", + "IC": "11", + "IS": "12", + "II": "13", + "IIA": "14", + "IIA1": "15", + "IIA2": "16", + "IIB": "17", + "IIC": "18", + "III": "19", + "IIIA": "20", + "IIIB": "21", + "IIIC": "22", + "IIIC1": "23", + "IIIC2": "24", + "IV": "25", + "IVA": "26", + "IVB": "27", + "IVC": "28", } if obs_value_codingcode_tnm_UICC in obs_value_codingcode_tnm_uicc_mapping_dict: return obs_value_codingcode_tnm_uicc_mapping_dict[obs_value_codingcode_tnm_UICC] @@ -332,6 +349,44 @@ def lookup_obs_value_codingcode_tnm_uicc_mapping(obs_value_codingcode_tnm_UICC): return obs_value_codingcode_tnm_UICC +def lookup_grouped_uicc(obs_value_codingcode_tnm_UICC): + grouped_uicc_dict = { + "0": "0", + "0a": "0", + "0is": "0", + "I": "1", + "IA": "1", + "IA1": "1", + "IA2": "1", + "IA3": "1", + "IB": "1", + "IB1": "1", + "IB2": "1", + "IC": "1", + "IS": "1", + "II": "2", + "IIA": "2", + "IIA1": "2", + "IIA2": "2", + "IIB": "2", + "IIC": "2", + "III": "3", + "IIIA": "3", + "IIIB": "3", + "IIIC": "3", + "IIIC1": "3", + "IIIC2": "3", + "IV": "4", + "IVA": "4", + "IVB": "4", + "IVC": "4", + } + if obs_value_codingcode_tnm_UICC in grouped_uicc_dict: + return grouped_uicc_dict[obs_value_codingcode_tnm_UICC] + else: + return obs_value_codingcode_tnm_UICC + + def transform_tnmp(observations_tnmp): lookup_obs_value_codingcode_tnm_UICC_mappingUDF = udf( lambda x: lookup_obs_value_codingcode_tnm_uicc_mapping(x), StringType() @@ -340,6 +395,13 @@ def transform_tnmp(observations_tnmp): "tnmp_UICC_mapped", lookup_obs_value_codingcode_tnm_UICC_mappingUDF(observations_tnmp.obsvaluecode), ) + lookup_lookup_grouped_uiccUDF = udf( + lambda x: lookup_grouped_uicc(x), StringType() + ) + observations_tnmp = observations_tnmp.withColumn( + "tnmp_UICC_grouped", + lookup_lookup_grouped_uiccUDF(observations_tnmp.obsvaluecode), + ) observations_tnmp = observations_tnmp.selectExpr( "obssubjreference as tnm_obssubjreference", "obsid as tnmp_obsid", @@ -348,6 +410,7 @@ def transform_tnmp(observations_tnmp): "obsvaluecode as tnmp_obsvalue_UICC", "obsdate as tnmp_obsdate", "tnmp_UICC_mapped", + "tnmp_UICC_grouped" ) observations_tnmp_cached = observations_tnmp.cache() @@ -456,21 +519,22 @@ def group_df(joined_dataframe): first("pat_id").alias("pat_id"), first("gender_mapped").alias("gender_mapped"), first("conditiondate").alias("conditiondate"), - max("condcodingcode").alias("condcodingcode"), + first("condcodingcode").alias("condcodingcode"), first("condcodingcode_mapped").alias("condcodingcode_mapped"), first("deceased_year").alias("deceased_year"), first("age_in_years").alias("age_in_years"), - max("tnmp_obsid").alias("tnmp_obsid"), - max("tnmp_obsvalue_UICC").alias("tnmp_obsvalue_UICC"), - max("tnmp_UICC_mapped").alias("tnmp_UICC_mapped"), - max("tnmp_obsdate").alias("tnmp_obsdate"), + first("age_at_diagnosis").alias("age_at_diagnosis"), + first("tnmp_obsid").alias("tnmp_obsid"), + first("tnmp_obsvalue_UICC").alias("tnmp_obsvalue_UICC"), + first("tnmp_UICC_mapped").alias("tnmp_UICC_mapped"), + first("tnmp_UICC_grouped").alias("tnmp_UICC_grouped"), + first("tnmp_obsdate").alias("tnmp_obsdate"), first("evidencereference").alias("evidencereference"), - max("obsdate_hist").alias("obsdate_hist"), + first("obsdate_hist").alias("obsdate_hist"), first("obs_value_hist_mapped_0_4").alias("obs_value_hist_mapped_0_4"), first("obs_value_hist_mapped_5").alias("obs_value_hist_mapped_5"), first("obs_id_hist").alias("obs_id_hist"), ) - joined_dataframe_grouped_repartitioned = joined_dataframe_grouped.repartition( 20, col("pat_id") ) @@ -484,9 +548,11 @@ def group_df(joined_dataframe): "condcodingcode", "condcodingcode_mapped", "age_in_years", + "age_at_diagnosis", "deceased_year", "tnmp_obsvalue_UICC", "tnmp_UICC_mapped", + "tnmp_UICC_grouped", "tnmp_obsdate", "obs_value_hist_mapped_0_4", "obs_value_hist_mapped_5", @@ -499,15 +565,15 @@ def group_df(joined_dataframe): def save_final_df(final_df): final_df_pandas = final_df.toPandas() final_df_pandas = final_df_pandas.rename_axis("ID") # required for opal import - output_path_filename = ( - settings.output_folder - + "/bzkfsummerschool" - + settings.kafka_topic_year_suffix - + ".csv" - ) - print("###### OUTPUTFolder settings.output_folder: ", settings.output_folder) - print("###### OUTPUTFILE: ", output_path_filename) + + output_filename = str(settings.output_filename + settings.kafka_topic_year_suffix + + ".csv") + output_path_filename = os.path.join( + settings.output_folder, + output_filename) print("###### current dir: ", os.getcwd()) + print("###### output_path_filename : ", output_path_filename) + final_df_pandas.to_csv(output_path_filename) @@ -522,9 +588,8 @@ def main(): read_data_from_kafka_save_delta(spark, kafka_topics) - df_bundles = spark.read.format("delta").load( - settings.output_folder + "/bundles-delta" - ) + df_bundles = spark.read.format("delta").load(os.path.join( + settings.output_folder, "bundles-delta")) df_patients = encode_patients(ptl, df_bundles) df_conditions = encode_conditions(ptl, df_bundles) @@ -532,30 +597,28 @@ def main(): df_pat_cond_joined = join_dataframes( df_patients, "pat_id", df_conditions, "subjectreference" ) + df_pat_cond_joined = add_age_at_condition(df_pat_cond_joined) df_observations_tnmp, df_observations_histology = encode_observations( ptl, df_bundles ) - df_pat_cond_obstnmp_joined = join_dataframes( df_pat_cond_joined, "pat_id", df_observations_tnmp, "tnm_obssubjreference" ) - df_pat_cond_obstnmp_obshist_joined = join_dataframes( df_pat_cond_obstnmp_joined, "pat_id", df_observations_histology, "obssubjreference", ) - df_pat_cond_obstnmp_obshist_joined_grouped = group_df( df_pat_cond_obstnmp_obshist_joined ) save_final_df(df_pat_cond_obstnmp_obshist_joined_grouped) - shutil.rmtree(settings.output_folder + "/bundles-delta") - print("########## DELETED bundles-delta folder and files") + shutil.rmtree(os.path.join(settings.output_folder, "bundles-delta")) + print("###### DELETED bundles-delta folder and files") # df_pat_cond_obstnmp_obshist_joined_grouped.show() diff --git a/src/adtfhir_to_opal/compose.yaml b/src/adtfhir_to_opal/compose.yaml index 0819800c..896949e4 100644 --- a/src/adtfhir_to_opal/compose.yaml +++ b/src/adtfhir_to_opal/compose.yaml @@ -4,7 +4,7 @@ services: context: . dockerfile: Dockerfile environment: - OUTPUT_FOLDER: "/opt/bitnami/spark/opal-output" + OUTPUT_FOLDER: "/opt/bitnami/spark/opal-output/" KAFKA_TOPIC_YEAR_SUFFIX: "" KAFKA_BOOTSTRAP_SERVER: "kafka:9092" KAFKA_PATIENT_TOPIC: "fhir.onkoadt.Patient"